diff --git a/pkg/agent/packetcapture/capture/pcap_linux.go b/pkg/agent/packetcapture/capture/pcap_linux.go index 29858b8cf8a..0656a118123 100644 --- a/pkg/agent/packetcapture/capture/pcap_linux.go +++ b/pkg/agent/packetcapture/capture/pcap_linux.go @@ -40,27 +40,29 @@ func NewPcapCapture() (*pcapCapture, error) { } func (p *pcapCapture) Capture(ctx context.Context, device string, srcIP, dstIP net.IP, packet *crdv1alpha1.Packet) (chan gopacket.Packet, error) { - eth, err := pcapgo.NewEthernetHandle(device) + // Compile the BPF filter in advance to reduce the time window between starting the capture and applying the filter. + inst := compilePacketFilter(packet, srcIP, dstIP) + klog.V(5).InfoS("Generated bpf instructions for PacketCapture", "device", device, "srcIP", srcIP, "dstIP", dstIP, "packetSpec", packet, "bpf", inst) + rawInst, err := bpf.Assemble(inst) if err != nil { return nil, err } - eth.SetPromiscuous(false) - eth.SetCaptureLength(maxSnapshotBytes) - - inst := compilePacketFilter(packet, srcIP, dstIP) - klog.V(5).InfoS("Generated bpf instructions for Packetcapture", "device", device, "srcIP", srcIP, "dstIP", dstIP, "packetSpec", packet, "bpf", inst) - rawInst, err := bpf.Assemble(inst) + eth, err := pcapgo.NewEthernetHandle(device) if err != nil { return nil, err } - err = eth.SetBPF(rawInst) - if err != nil { + if err = eth.SetPromiscuous(false); err != nil { + return nil, err + } + if err = eth.SetBPF(rawInst); err != nil { + return nil, err + } + if err = eth.SetCaptureLength(maxSnapshotBytes); err != nil { return nil, err } - packetSource := gopacket.NewPacketSource(eth, layers.LinkTypeEthernet) - packetSource.NoCopy = true + packetSource := gopacket.NewPacketSource(eth, layers.LinkTypeEthernet, gopacket.WithNoCopy(true)) return packetSource.PacketsCtx(ctx), nil } diff --git a/pkg/agent/packetcapture/capture/pcap_windows.go b/pkg/agent/packetcapture/capture/pcap_unsupported.go similarity index 86% rename from pkg/agent/packetcapture/capture/pcap_windows.go rename to pkg/agent/packetcapture/capture/pcap_unsupported.go index 323e027abfd..9ccb908bca4 100644 --- a/pkg/agent/packetcapture/capture/pcap_windows.go +++ b/pkg/agent/packetcapture/capture/pcap_unsupported.go @@ -1,3 +1,6 @@ +//go:build !linux +// +build !linux + // Copyright 2024 Antrea Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -28,9 +31,9 @@ type pcapCapture struct { } func NewPcapCapture() (*pcapCapture, error) { - return nil, errors.New("PacketCapture is not implemented on Windows") + return nil, errors.New("PacketCapture is not implemented") } func (p *pcapCapture) Capture(ctx context.Context, device string, srcIP, dstIP net.IP, packet *crdv1alpha1.Packet) (chan gopacket.Packet, error) { - return nil, errors.New("PacketCapture is not implemented on Windows") + return nil, errors.New("PacketCapture is not implemented") } diff --git a/pkg/agent/packetcapture/packetcapture_controller.go b/pkg/agent/packetcapture/packetcapture_controller.go index dd29cb2f376..3691f0cd895 100644 --- a/pkg/agent/packetcapture/packetcapture_controller.go +++ b/pkg/agent/packetcapture/packetcapture_controller.go @@ -68,8 +68,8 @@ const ( controllerName = "PacketCaptureController" resyncPeriod time.Duration = 0 - minRetryDelay = 5 * time.Second - maxRetryDelay = 60 * time.Second + minRetryDelay = 500 * time.Millisecond + maxRetryDelay = 30 * time.Second defaultWorkers = 4 @@ -85,7 +85,7 @@ const ( type packetCapturePhase string const ( - packetCapturePhasePending packetCapturePhase = "" + packetCapturePhasePending packetCapturePhase = "Pending" packetCapturePhaseStarted packetCapturePhase = "Started" packetCapturePhaseComplete packetCapturePhase = "Complete" ) @@ -106,8 +106,10 @@ type packetCaptureState struct { phase packetCapturePhase // filePath is the final path shown in PacketCapture's status. filePath string - // err is the latest error observed in the capture. - err error + // captureErr is the error observed during the capturing phase. + captureErr error + // uploadErr is the error observed during the uploading phase. + uploadErr error // cancel is the cancel function for capture context. cancel context.CancelFunc } @@ -176,15 +178,15 @@ func (c *Controller) enqueuePacketCapture(pc *crdv1alpha1.PacketCapture) { func (c *Controller) Run(stopCh <-chan struct{}) { defer c.queue.ShutDown() - klog.InfoS("Starting packetcapture controller", "name", controllerName) - defer klog.InfoS("Shutting down packetcapture controller", "name", controllerName) + klog.InfoS("Starting controller", "name", controllerName) + defer klog.InfoS("Shutting down controller", "name", controllerName) cacheSynced := []cache.InformerSynced{c.packetCaptureSynced} if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSynced...) { return } - err := defaultFS.MkdirAll(packetDirectory, 0755) + err := defaultFS.MkdirAll(packetDirectory, 0700) if err != nil { klog.ErrorS(err, "Couldn't create the directory for storing captured packets", "directory", packetDirectory) return @@ -203,16 +205,28 @@ func (c *Controller) addPacketCapture(obj interface{}) { } func (c *Controller) updatePacketCapture(oldObj, newObj interface{}) { - newPc := newObj.(*crdv1alpha1.PacketCapture) - oldPc := oldObj.(*crdv1alpha1.PacketCapture) - if newPc.Generation != oldPc.Generation { - klog.V(2).InfoS("Processing PacketCapture UPDATE event", "name", newPc.Name) - c.enqueuePacketCapture(newPc) + newPC := newObj.(*crdv1alpha1.PacketCapture) + oldPC := oldObj.(*crdv1alpha1.PacketCapture) + if newPC.Generation != oldPC.Generation { + klog.V(2).InfoS("Processing PacketCapture UPDATE event", "name", newPC.Name) + c.enqueuePacketCapture(newPC) } } func (c *Controller) deletePacketCapture(obj interface{}) { - pc := obj.(*crdv1alpha1.PacketCapture) + pc, ok := obj.(*crdv1alpha1.PacketCapture) + if !ok { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Received unexpected object: %v", obj) + return + } + pc, ok = deletedState.Obj.(*crdv1alpha1.PacketCapture) + if !ok { + klog.Errorf("DeletedFinalStateUnknown contains non-PacketCapture object: %v", deletedState.Obj) + return + } + } klog.V(2).InfoS("Processing PacketCapture DELETE event", "name", pc.Name) c.enqueuePacketCapture(pc) } @@ -242,76 +256,59 @@ func (c *Controller) processPacketCaptureItem() bool { } func (c *Controller) syncPacketCapture(pcName string) error { - cleanupStatus := func() { - c.mutex.Lock() - defer c.mutex.Unlock() - state := c.captures[pcName] - if state != nil { - if state.cancel != nil { - state.cancel() - } - delete(c.captures, pcName) - } - } - pc, err := c.packetCaptureLister.Get(pcName) - if apierrors.IsNotFound(err) { + // Lister.Get only returns error when the resource is not found. + if err != nil { c.cleanupPacketCapture(pcName) - cleanupStatus() return nil } // Capture will not occur on this Node if a corresponding Pod interface is not found. device := c.getTargetCaptureDevice(pc) if device == "" { - klog.V(4).InfoS("Skipping process PacketCapture", "name", pcName) - return nil - } - - if err := c.validatePacketCapture(&pc.Spec); err != nil { - klog.ErrorS(err, "Invalid PacketCapture", "name", pc.Name) - if updateErr := c.updateStatus(context.Background(), pcName, packetCaptureState{err: err}); updateErr != nil { - klog.ErrorS(err, "Failed to update PacketCapture status", "name", pc.Name) - } - cleanupStatus() + klog.V(4).InfoS("Skipping unrelated PacketCapture", "name", pcName) return nil } - state := func() packetCaptureState { + state, err := func() (packetCaptureState, error) { c.mutex.Lock() defer c.mutex.Unlock() state := c.captures[pcName] if state == nil { - state = &packetCaptureState{targetCapturedPacketsNum: pc.Spec.CaptureConfig.FirstN.Number} + state = &packetCaptureState{ + phase: packetCapturePhasePending, + targetCapturedPacketsNum: pc.Spec.CaptureConfig.FirstN.Number, + } c.captures[pcName] = state } - phase := state.phase - klog.InfoS("Syncing PacketCapture", "name", pcName, "phase", phase) - if phase != packetCapturePhasePending { - return *state - } + klog.V(2).InfoS("Processing PacketCapture", "name", pcName, "phase", state.phase) + if state.phase != packetCapturePhasePending { + return *state, nil + } + // Do not return the error as it's not a transient error. + if err := c.validatePacketCapture(&pc.Spec); err != nil { + state.captureErr = err + return *state, nil + } + // Return the error as it's a transient error. if c.numRunningCaptures >= maxConcurrentCaptures { - err = fmt.Errorf("PacketCapture running count reach limit") - } else { - // crd spec make sure it's not nil - timeout := time.Duration(*pc.Spec.Timeout) * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - state.cancel = cancel - if err = c.startPacketCapture(ctx, pc, state, device); err != nil { - phase = packetCapturePhaseComplete - } else { - phase = packetCapturePhaseStarted - c.numRunningCaptures += 1 - } + state.captureErr = fmt.Errorf("PacketCapture running count reach limit") + return *state, state.captureErr } - state.phase = phase - state.err = err - c.captures[pcName] = state - return *state + + // The OpenAPI schema for the CRD makes sure Spec.Timeout is not nil. + timeout := time.Duration(*pc.Spec.Timeout) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + state.cancel = cancel + state.phase = packetCapturePhaseStarted + // Start the capture goroutine in a separate goroutine. The goroutine will decrease numRunningCaptures on exit. + c.numRunningCaptures += 1 + go c.startCapture(ctx, pc, state, device) + return *state, nil }() - if updateErr := c.updateStatus(context.Background(), pcName, state); updateErr != nil { + if updateErr := c.updateStatus(context.Background(), pc, state); updateErr != nil { return fmt.Errorf("error when patching status: %w", updateErr) } return err @@ -338,26 +335,30 @@ func (c *Controller) cleanupPacketCapture(pcName string) { } else { klog.ErrorS(err, "Failed to delete the captured pcap file", "name", pcName, "path", path) } + c.mutex.Lock() + defer c.mutex.Unlock() + state := c.captures[pcName] + if state != nil { + if state.cancel != nil { + state.cancel() + } + delete(c.captures, pcName) + } } -func getPacketFileAndWriter(name string) (afero.File, *pcapgo.NgWriter, error) { - filePath := nameToPath(name) +func getPacketFile(filePath string) (afero.File, error) { var file afero.File if _, err := os.Stat(filePath); err == nil { klog.InfoS("Packet file already exists. This may be caused by an unexpected termination, will delete it", "path", filePath) if err := defaultFS.Remove(filePath); err != nil { - return nil, nil, err + return nil, err } } file, err := defaultFS.Create(filePath) if err != nil { - return nil, nil, fmt.Errorf("failed to create pcapng file: %w", err) - } - writer, err := pcapgo.NewNgWriter(file, layers.LinkTypeEthernet) - if err != nil { - return nil, nil, fmt.Errorf("couldn't initialize a pcap writer: %w", err) + return nil, fmt.Errorf("failed to create pcapng file: %w", err) } - return file, writer, nil + return file, nil } // getTargetCaptureDevice is trying to locate the target device for packet capture. If the target @@ -381,97 +382,114 @@ func (c *Controller) getTargetCaptureDevice(pc *crdv1alpha1.PacketCapture) strin return podInterfaces[0].InterfaceName } -// startPacketCapture starts the capture on the target device. The actual capture process will be started -// in a separated go routine. -func (c *Controller) startPacketCapture(ctx context.Context, pc *crdv1alpha1.PacketCapture, pcState *packetCaptureState, device string) error { - klog.V(2).InfoS("Started processing PacketCapture on the current Node", "name", pc.Name, "device", device) - go func() { - captureErr := c.performCapture(ctx, pc, pcState, device) - func() { - c.mutex.Lock() - defer c.mutex.Unlock() - c.numRunningCaptures -= 1 - state := c.captures[pc.Name] - if state != nil { - state.phase = packetCapturePhaseComplete - state.err = captureErr - } +func (c *Controller) startCapture(ctx context.Context, pc *crdv1alpha1.PacketCapture, state *packetCaptureState, device string) { + klog.InfoS("Starting packet capture on the current Node", "name", pc.Name, "device", device) + defer klog.InfoS("Stopped packet capture on the current Node", "name", pc.Name, "device", device) + // Resync the PacketCapture on exit of the capture goroutine. + defer c.enqueuePacketCapture(pc) + + var filePath string + var captureErr, uploadErr error + func() { + localFilePath := nameToPath(pc.Name) + file, err := getPacketFile(localFilePath) + if err != nil { + captureErr = err + return + } + defer file.Close() - }() - c.enqueuePacketCapture(pc) + var capturedAny bool + capturedAny, captureErr = c.performCapture(ctx, pc, state, file, device) + // If nothing is captured, no need to proceed. + if !capturedAny { + return + } + // If any is captured, upload it if required and update filePath in the status of the PacketCapture. + filePath = env.GetPodName() + ":" + localFilePath + + if pc.Spec.FileServer == nil { + return + } + // It can't use the same context as performCapture because it might have timed out. + if uploadErr = c.uploadPackets(context.TODO(), pc, file); uploadErr != nil { + return + } + filePath = fmt.Sprintf("%s/%s.pcapng", pc.Spec.FileServer.URL, pc.Name) }() - return nil + + if captureErr != nil { + klog.ErrorS(captureErr, "PacketCapture failed capturing packets", "name", pc.Name) + } + if uploadErr != nil { + klog.ErrorS(uploadErr, "PacketCapture failed uploading packets", "name", pc.Name) + } + + c.mutex.Lock() + defer c.mutex.Unlock() + state.phase = packetCapturePhaseComplete + state.filePath = filePath + state.captureErr = captureErr + state.uploadErr = uploadErr + c.numRunningCaptures -= 1 } +// performCapture blocks until either the target number of packets have been captured, the context is canceled, or the +// context reaches its deadline. +// It returns a boolean indicating whether any packet is captured, and an error if the target number of packets are not +// captured. func (c *Controller) performCapture( ctx context.Context, pc *crdv1alpha1.PacketCapture, captureState *packetCaptureState, + file afero.File, device string, -) error { +) (bool, error) { srcIP, dstIP, err := c.parseIPs(ctx, pc) if err != nil { - return err + return false, err } - pcapngFile, pcapngWriter, err := getPacketFileAndWriter(pc.Name) + pcapngWriter, err := pcapgo.NewNgWriter(file, layers.LinkTypeEthernet) if err != nil { - return err + return false, fmt.Errorf("couldn't initialize a pcap writer: %w", err) } + defer pcapngWriter.Flush() updateRateLimiter := rate.NewLimiter(rate.Every(captureStatusUpdatePeriod), 1) packets, err := c.captureInterface.Capture(ctx, device, srcIP, dstIP, pc.Spec.Packet) if err != nil { - klog.ErrorS(err, "Failed to start capture") - return err + return false, err } - klog.InfoS("Starting packet capture", "name", pc.Name, "device", device) + // Track whether any packet is captured. + capturedAny := false for { select { case packet := <-packets: - c.mutex.Lock() - captureState.capturedPacketsNum++ - reachTarget := captureState.isCaptureSuccessful() - klog.V(5).InfoS("Captured packets count", "name", pc.Name, "count", captureState.capturedPacketsNum) - c.mutex.Unlock() ci := gopacket.CaptureInfo{ Timestamp: time.Now(), CaptureLength: len(packet.Data()), Length: len(packet.Data()), } - err = pcapngWriter.WritePacket(ci, packet.Data()) - if err != nil { - return fmt.Errorf("couldn't write packets: %w", err) + klog.V(5).InfoS("Captured packet", "name", pc.Name, "len", ci.Length) + if err = pcapngWriter.WritePacket(ci, packet.Data()); err != nil { + return capturedAny, fmt.Errorf("couldn't write packets: %w", err) } - klog.V(5).InfoS("Captured packet length", "name", pc.Name, "len", ci.Length) - - // if reach the target. flush the file and upload it. - if reachTarget { - path := env.GetPodName() + ":" + nameToPath(pc.Name) - statusPath := path - if err = pcapngWriter.Flush(); err != nil { - return err - } - if pc.Spec.FileServer != nil { - err = c.uploadPackets(ctx, pc, pcapngFile) - klog.V(4).InfoS("Upload captured packets", "name", pc.Name, "path", path) - statusPath = fmt.Sprintf("%s/%s.pcapng", pc.Spec.FileServer.URL, pc.Name) - } + capturedAny = true + + if success := func() bool { c.mutex.Lock() - captureState.filePath = statusPath - c.mutex.Unlock() - if err != nil { - return err - } - if err := pcapngFile.Close(); err != nil { - klog.ErrorS(err, "Close pcapng file error", "name", pc.Name, "path", path) - } - return nil - } else if updateRateLimiter.Allow() { - // use rate limiter to reduce the times we need to update status. + defer c.mutex.Unlock() + captureState.capturedPacketsNum++ + klog.V(5).InfoS("Captured packets count", "name", pc.Name, "count", captureState.capturedPacketsNum) + return captureState.isCaptureSuccessful() + }(); success { + return true, nil + } + // use rate limiter to reduce the times we need to update status. + if updateRateLimiter.Allow() { c.enqueuePacketCapture(pc) } - case <-ctx.Done(): - return ctx.Err() + return capturedAny, ctx.Err() } } } @@ -567,119 +585,111 @@ func (c *Controller) uploadPackets(ctx context.Context, pc *crdv1alpha1.PacketCa return uploader.Upload(pc.Spec.FileServer.URL, c.generatePacketsPathForServer(pc.Name), cfg, outputFile) } -func (c *Controller) updateStatus(ctx context.Context, name string, state packetCaptureState) error { - toUpdate, getErr := c.packetCaptureLister.Get(name) - if getErr != nil { - klog.InfoS("Didn't find the original PacketCapture, skip updating status", "name", name) - return nil - } - conditions := []crdv1alpha1.PacketCaptureCondition{} +func (c *Controller) updateStatus(ctx context.Context, pc *crdv1alpha1.PacketCapture, state packetCaptureState) error { + // Make a deepcopy as the object returned from lister must not be updated directly. + toUpdate := pc.DeepCopy() + var conditions []crdv1alpha1.PacketCaptureCondition t := metav1.Now() - updatedStatus := crdv1alpha1.PacketCaptureStatus{ + desiredStatus := crdv1alpha1.PacketCaptureStatus{ NumberCaptured: state.capturedPacketsNum, FilePath: state.filePath, } - if state.err != nil { - updatedStatus.FilePath = "" - if errors.Is(state.err, context.DeadlineExceeded) { - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ - Type: crdv1alpha1.PacketCaptureComplete, - Status: metav1.ConditionStatus(v1.ConditionTrue), - LastTransitionTime: t, - Reason: "Timeout", - }) - - } else if state.isCaptureSuccessful() { - // most likely failed to upload after capture succeed. - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ - Type: crdv1alpha1.PacketCaptureComplete, - Status: metav1.ConditionStatus(v1.ConditionTrue), + var conditionStarted, conditionComplete, conditionUploaded crdv1alpha1.PacketCaptureCondition + switch state.phase { + case packetCapturePhasePending: + if state.captureErr != nil { + conditionStarted = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionFalse), LastTransitionTime: t, - Reason: "Succeed", - }) + Reason: "NotStarted", + Message: state.captureErr.Error(), + } } else { - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ - Type: crdv1alpha1.PacketCaptureComplete, - Status: metav1.ConditionStatus(v1.ConditionFalse), - LastTransitionTime: metav1.Now(), - Reason: "CaptureFailed", - Message: state.err.Error(), - }) - } - if toUpdate.Spec.FileServer != nil && state.filePath != "" { - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ - Type: crdv1alpha1.PacketCaptureFileUploaded, + conditionStarted = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureStarted, Status: metav1.ConditionStatus(v1.ConditionFalse), LastTransitionTime: t, - Reason: "UploadFailed", - Message: state.err.Error(), - }) - } - if state.phase == packetCapturePhasePending { - conditions = []crdv1alpha1.PacketCaptureCondition{ - { - Type: crdv1alpha1.PacketCaptureStarted, - Status: metav1.ConditionStatus(v1.ConditionFalse), - LastTransitionTime: t, - Reason: "StartFailed", - Message: state.err.Error(), - }, + Reason: "Pending", } } - } else { - if state.isCaptureSuccessful() { - conditions = []crdv1alpha1.PacketCaptureCondition{ - { - Type: crdv1alpha1.PacketCaptureComplete, - Status: metav1.ConditionStatus(v1.ConditionTrue), - LastTransitionTime: t, - Reason: "Succeed", - }, + conditions = append(conditions, conditionStarted) + case packetCapturePhaseStarted: + conditionStarted = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: t, + Reason: "Started", + } + conditionComplete = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureComplete, + Status: metav1.ConditionStatus(v1.ConditionFalse), + LastTransitionTime: t, + Reason: "Progressing", + } + conditions = append(conditions, conditionStarted, conditionComplete) + case packetCapturePhaseComplete: + conditionStarted = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: t, + Reason: "Started", + } + reason := "Succeed" + message := "" + if state.captureErr != nil { + if errors.Is(state.captureErr, context.DeadlineExceeded) { + reason = "Timeout" + } else { + reason = "Failed" } - if toUpdate.Spec.FileServer != nil { - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ + message = state.captureErr.Error() + } + conditionComplete = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureComplete, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: t, + Reason: reason, + Message: message, + } + conditions = append(conditions, conditionStarted, conditionComplete) + // Set Uploaded condition if applicable. + if state.capturedPacketsNum > 0 && pc.Spec.FileServer != nil { + if state.uploadErr != nil { + conditionUploaded = crdv1alpha1.PacketCaptureCondition{ + Type: crdv1alpha1.PacketCaptureFileUploaded, + Status: metav1.ConditionStatus(v1.ConditionFalse), + LastTransitionTime: t, + Reason: "Failed", + Message: state.uploadErr.Error(), + } + } else { + conditionUploaded = crdv1alpha1.PacketCaptureCondition{ Type: crdv1alpha1.PacketCaptureFileUploaded, Status: metav1.ConditionStatus(v1.ConditionTrue), LastTransitionTime: t, Reason: "Succeed", - }) + } } - } else if state.phase == packetCapturePhaseStarted { - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ - Type: crdv1alpha1.PacketCaptureStarted, - Status: metav1.ConditionStatus(v1.ConditionTrue), - LastTransitionTime: t, - }) - } else if state.phase == packetCapturePhasePending { - conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{ - Type: crdv1alpha1.PacketCaptureStarted, - Status: metav1.ConditionStatus(v1.ConditionFalse), - LastTransitionTime: t, - }) + conditions = append(conditions, conditionUploaded) } - } - updatedStatus.Conditions = conditions - if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if toUpdate.Status.FilePath != "" { - updatedStatus.FilePath = toUpdate.Status.FilePath - } - if updatedStatus.NumberCaptured == 0 && toUpdate.Status.NumberCaptured > 0 { - updatedStatus.NumberCaptured = toUpdate.Status.NumberCaptured - } + desiredStatus.Conditions = conditions - updatedStatus.Conditions = mergeConditions(toUpdate.Status.Conditions, updatedStatus.Conditions) - if packetCaptureStatusEqual(toUpdate.Status, updatedStatus) { + if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if packetCaptureStatusEqual(toUpdate.Status, desiredStatus) { return nil } - toUpdate.Status = updatedStatus - klog.V(2).InfoS("Updating PacketCapture", "name", name, "status", toUpdate.Status) + + desiredStatus.Conditions = mergeConditions(toUpdate.Status.Conditions, desiredStatus.Conditions) + toUpdate.Status = desiredStatus + klog.V(2).InfoS("Updating PacketCapture", "name", pc.Name, "status", toUpdate.Status) _, updateErr := c.crdClient.CrdV1alpha1().PacketCaptures().UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}) if updateErr != nil && apierrors.IsConflict(updateErr) { var getErr error - if toUpdate, getErr = c.crdClient.CrdV1alpha1().PacketCaptures().Get(ctx, name, metav1.GetOptions{}); getErr != nil { + if toUpdate, getErr = c.crdClient.CrdV1alpha1().PacketCaptures().Get(ctx, pc.Name, metav1.GetOptions{}); getErr != nil { return getErr } } @@ -688,7 +698,7 @@ func (c *Controller) updateStatus(ctx context.Context, name string, state packet }); retryErr != nil { return retryErr } - klog.V(2).InfoS("Updated PacketCapture", "name", name) + klog.V(2).InfoS("Updated PacketCapture", "name", pc.Name) return nil } diff --git a/pkg/agent/packetcapture/packetcapture_controller_test.go b/pkg/agent/packetcapture/packetcapture_controller_test.go index aafe573c27a..1c8c2506ad1 100644 --- a/pkg/agent/packetcapture/packetcapture_controller_test.go +++ b/pkg/agent/packetcapture/packetcapture_controller_test.go @@ -280,7 +280,7 @@ func TestMultiplePacketCaptures(t *testing.T) { } var objs []runtime.Object for i := 0; i < 20; i++ { - objs = append(objs, genTestCR(nameFunc(i), int32(testCaptureNum))) + objs = append(objs, genTestCR(nameFunc(i), testCaptureNum)) } pcc := newFakePacketCaptureController(t, nil, objs) pcc.sftpUploader = &testUploader{url: testFTPUrl} @@ -291,38 +291,39 @@ func TestMultiplePacketCaptures(t *testing.T) { pcc.informerFactory.Start(stopCh) pcc.informerFactory.WaitForCacheSync(stopCh) go pcc.Run(stopCh) - assert.Eventually(t, func() bool { - items, err := pcc.crdClient.CrdV1alpha1().PacketCaptures().List(context.Background(), metav1.ListOptions{}) - if err != nil { - return false - } - for _, result := range items.Items { - for _, cond := range result.Status.Conditions { - if cond.Type == crdv1alpha1.PacketCaptureComplete || cond.Type == crdv1alpha1.PacketCaptureFileUploaded { - if cond.Status == metav1.ConditionFalse { - return false - } + assert.EventuallyWithT(t, func(c *assert.CollectT) { + list, _ := pcc.crdClient.CrdV1alpha1().PacketCaptures().List(context.Background(), metav1.ListOptions{}) + for _, item := range list.Items { + var startedStatus, completeStatus, uploadStatus metav1.ConditionStatus + for _, cond := range item.Status.Conditions { + if cond.Type == crdv1alpha1.PacketCaptureStarted { + startedStatus = cond.Status + } + if cond.Type == crdv1alpha1.PacketCaptureComplete { + completeStatus = cond.Status + } + if cond.Type == crdv1alpha1.PacketCaptureFileUploaded { + uploadStatus = cond.Status } } + assert.Equal(c, metav1.ConditionTrue, startedStatus) + assert.Equal(c, metav1.ConditionTrue, completeStatus) + assert.Equal(c, metav1.ConditionTrue, uploadStatus) } pcc.mutex.Lock() - if pcc.numRunningCaptures != 0 { - return false - } - pcc.mutex.Unlock() - return true + defer pcc.mutex.Unlock() + assert.Equal(c, 0, pcc.numRunningCaptures) + assert.Equal(c, 20, len(pcc.captures)) }, 5*time.Second, 50*time.Millisecond) + for i := 0; i < 20; i++ { err := pcc.crdClient.CrdV1alpha1().PacketCaptures().Delete(context.TODO(), nameFunc(i), metav1.DeleteOptions{}) require.NoError(t, err) } assert.Eventually(t, func() bool { pcc.mutex.Lock() - if len(pcc.captures) != 0 { - return false - } - pcc.mutex.Unlock() - return true + defer pcc.mutex.Unlock() + return len(pcc.captures) == 0 }, 2*time.Second, 20*time.Millisecond) } @@ -332,11 +333,13 @@ func TestPacketCaptureControllerRun(t *testing.T) { pcs := []struct { name string pc *crdv1alpha1.PacketCapture + expectStartedStatus metav1.ConditionStatus expectCompleteStatus metav1.ConditionStatus expectUploadStatus metav1.ConditionStatus }{ { - name: "start packetcapture", + name: "pod-to-pod with file server", + expectStartedStatus: metav1.ConditionTrue, expectCompleteStatus: metav1.ConditionTrue, expectUploadStatus: metav1.ConditionTrue, pc: &crdv1alpha1.PacketCapture{ @@ -370,9 +373,9 @@ func TestPacketCaptureControllerRun(t *testing.T) { }, }, { - name: "parse ip", + name: "pod-to-pod without file server", + expectStartedStatus: metav1.ConditionTrue, expectCompleteStatus: metav1.ConditionTrue, - expectUploadStatus: metav1.ConditionTrue, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{Name: "pc2", UID: "uid2"}, Spec: crdv1alpha1.PacketCaptureSpec{ @@ -396,17 +399,13 @@ func TestPacketCaptureControllerRun(t *testing.T) { Packet: &crdv1alpha1.Packet{ Protocol: &icmpProto, }, - FileServer: &crdv1alpha1.PacketCaptureFileServer{ - URL: "sftp://127.0.0.1:22/aaa", - }, Timeout: &testCaptureTimeout, }, }, }, { - name: "invalid proto", - expectCompleteStatus: metav1.ConditionFalse, - expectUploadStatus: metav1.ConditionFalse, + name: "invalid proto", + expectStartedStatus: metav1.ConditionFalse, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{Name: "pc4", UID: "uid4"}, Spec: crdv1alpha1.PacketCaptureSpec{ @@ -439,6 +438,7 @@ func TestPacketCaptureControllerRun(t *testing.T) { }, { name: "upload failed", + expectStartedStatus: metav1.ConditionTrue, expectCompleteStatus: metav1.ConditionTrue, expectUploadStatus: metav1.ConditionFalse, pc: &crdv1alpha1.PacketCapture{ @@ -489,30 +489,29 @@ func TestPacketCaptureControllerRun(t *testing.T) { go pcc.Run(stopCh) for _, item := range pcs { t.Run(item.name, func(t *testing.T) { - assert.Eventually(t, func() bool { + assert.EventuallyWithT(t, func(c *assert.CollectT) { result, err := pcc.crdClient.CrdV1alpha1().PacketCaptures().Get(context.Background(), item.pc.Name, metav1.GetOptions{}) - if err != nil { - return false - } + require.NoError(c, err) + var startedStatus, completeStatus, uploadStatus metav1.ConditionStatus for _, cond := range result.Status.Conditions { - if cond.Type == crdv1alpha1.PacketCaptureComplete && item.expectCompleteStatus != cond.Status { - return false + if cond.Type == crdv1alpha1.PacketCaptureStarted { + startedStatus = cond.Status + } + if cond.Type == crdv1alpha1.PacketCaptureComplete { + completeStatus = cond.Status } - if cond.Type == crdv1alpha1.PacketCaptureFileUploaded && item.expectUploadStatus != cond.Status { - return false + if cond.Type == crdv1alpha1.PacketCaptureFileUploaded { + uploadStatus = cond.Status } } + assert.Equal(c, item.expectStartedStatus, startedStatus) + assert.Equal(c, item.expectUploadStatus, uploadStatus) + assert.Equal(c, item.expectCompleteStatus, completeStatus) + assert.Equal(c, item.expectUploadStatus, uploadStatus) if item.expectCompleteStatus == metav1.ConditionTrue { - if result.Status.NumberCaptured != testCaptureNum { - return false - } + assert.Equal(c, testCaptureNum, result.Status.NumberCaptured) } - // delete cr - err = pcc.crdClient.CrdV1alpha1().PacketCaptures().Delete(context.TODO(), item.pc.Name, metav1.DeleteOptions{}) - return err == nil - }, 2*time.Second, 20*time.Millisecond) - stopCh <- struct{}{} }) } } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 6a8af13ad1f..639beb1e8b1 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1725,14 +1725,7 @@ func (data *TestData) podWaitForIPs(timeout time.Duration, name, namespace strin if pod.Status.PodIP == "" { return nil, fmt.Errorf("Pod is running but has no assigned IP, which should never happen") } - podIPStrings := sets.New[string](pod.Status.PodIP) - for _, podIP := range pod.Status.PodIPs { - ipStr := strings.TrimSpace(podIP.IP) - if ipStr != "" { - podIPStrings.Insert(ipStr) - } - } - ips, err := parsePodIPs(podIPStrings) + ips, err := parsePodIPs(pod) if err != nil { return nil, err } @@ -1748,7 +1741,14 @@ func (data *TestData) podWaitForIPs(timeout time.Duration, name, namespace strin return ips, nil } -func parsePodIPs(podIPStrings sets.Set[string]) (*PodIPs, error) { +func parsePodIPs(pod *corev1.Pod) (*PodIPs, error) { + podIPStrings := sets.New[string](pod.Status.PodIP) + for _, podIP := range pod.Status.PodIPs { + ipStr := strings.TrimSpace(podIP.IP) + if ipStr != "" { + podIPStrings.Insert(ipStr) + } + } ips := new(PodIPs) for idx := range sets.List(podIPStrings) { ipStr := sets.List(podIPStrings)[idx] diff --git a/test/e2e/nodeportlocal_test.go b/test/e2e/nodeportlocal_test.go index 21bb85d5a16..a677a2733be 100644 --- a/test/e2e/nodeportlocal_test.go +++ b/test/e2e/nodeportlocal_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" npltesting "antrea.io/antrea/pkg/agent/nodeportlocal/testing" @@ -115,15 +114,7 @@ func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, test return false, nil } - podIPStrings := sets.New[string](pod.Status.PodIP) - for _, podIP := range pod.Status.PodIPs { - ipStr := strings.TrimSpace(podIP.IP) - if ipStr != "" { - podIPStrings.Insert(ipStr) - } - } - - testPodIP, err = parsePodIPs(podIPStrings) + testPodIP, err = parsePodIPs(pod) if err != nil || testPodIP.IPv4 == nil { return false, nil } diff --git a/test/e2e/packetcapture_test.go b/test/e2e/packetcapture_test.go index 55b49e934bf..c6b8c77cc4a 100644 --- a/test/e2e/packetcapture_test.go +++ b/test/e2e/packetcapture_test.go @@ -26,15 +26,23 @@ import ( "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/ptr" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/features" ) +var ( + icmpProto = intstr.FromString("ICMP") + udpProto = intstr.FromString("UDP") + tcpProto = intstr.FromString("TCP") +) + type pcTestCase struct { name string pc *crdv1alpha1.PacketCapture @@ -42,8 +50,6 @@ type pcTestCase struct { // required IP version, skip if not match. ipVersion int - // Source Pod to run ping for live-traffic PacketCapture. - srcPod string } func genSFTPService() *v1.Service { @@ -92,6 +98,14 @@ func genSFTPDeployment() *appsv1.Deployment { Image: "ghcr.io/atmoz/sftp/debian:latest", ImagePullPolicy: v1.PullIfNotPresent, Args: []string{"foo:pass:::upload"}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + TCPSocket: &v1.TCPSocketAction{ + Port: intstr.FromInt32(int32(22)), + }, + }, + PeriodSeconds: 3, + }, }, }, }, @@ -121,31 +135,28 @@ func TestPacketCapture(t *testing.T) { } defer teardownTest(t, data) - // setup sftp server for test. - secretUserName := "foo" - secretPassword := "pass" - // #nosec G101 - pcSecretName := "antrea-packetcapture-fileserver-auth" - pcSecretNamespace := "kube-system" - _, err = data.clientset.AppsV1().Deployments(data.testNamespace).Create(context.TODO(), genSFTPDeployment(), metav1.CreateOptions{}) + deployment, err := data.clientset.AppsV1().Deployments(data.testNamespace).Create(context.TODO(), genSFTPDeployment(), metav1.CreateOptions{}) require.NoError(t, err) - _, err = data.clientset.CoreV1().Services(data.testNamespace).Create(context.TODO(), genSFTPService(), metav1.CreateOptions{}) + defer data.clientset.AppsV1().Deployments(data.testNamespace).Delete(context.TODO(), deployment.Name, metav1.DeleteOptions{}) + svc, err := data.clientset.CoreV1().Services(data.testNamespace).Create(context.TODO(), genSFTPService(), metav1.CreateOptions{}) require.NoError(t, err) + defer data.clientset.CoreV1().Services(data.testNamespace).Delete(context.TODO(), svc.Name, metav1.DeleteOptions{}) failOnError(data.waitForDeploymentReady(t, data.testNamespace, "sftp", defaultTimeout), t) sec := &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: pcSecretName, - Namespace: pcSecretNamespace, + // #nosec G101 + Name: "antrea-packetcapture-fileserver-auth", + Namespace: "kube-system", }, Data: map[string][]byte{ - "username": []byte(secretUserName), - "password": []byte(secretPassword), + "username": []byte("foo"), + "password": []byte("pass"), }, } - _, err = data.clientset.CoreV1().Secrets(pcSecretNamespace).Create(context.TODO(), sec, metav1.CreateOptions{}) + _, err = data.clientset.CoreV1().Secrets(sec.Namespace).Create(context.TODO(), sec, metav1.CreateOptions{}) require.NoError(t, err) - defer data.clientset.CoreV1().Secrets(pcSecretNamespace).Delete(context.TODO(), pcSecretName, metav1.DeleteOptions{}) + defer data.clientset.CoreV1().Secrets(sec.Namespace).Delete(context.TODO(), sec.Name, metav1.DeleteOptions{}) t.Run("testPacketCaptureBasic", func(t *testing.T) { testPacketCaptureBasic(t, data) @@ -156,54 +167,46 @@ func TestPacketCapture(t *testing.T) { // testPacketCaptureTCP verifies if PacketCapture can capture tcp packets. this function only contains basic // cases with pod-to-pod. func testPacketCaptureBasic(t *testing.T, data *TestData) { - nodeIdx := 0 - node1 := nodeName(nodeIdx) + node1 := nodeName(0) + clientPodName := "client" tcpServerPodName := "tcp-server" - pcToolboxPodName := "toolbox" udpServerPodName := "udp-server" - icmpProto := intstr.FromString("ICMP") - udpProto := intstr.FromString("UDP") - tcpProto := intstr.FromString("TCP") - testServerPort := int32(80) - pcShortTimeout := int32(5) - nonExistPodName := "non-existing-pod" - testNonExistPort := int32(8085) - - err := createUDPServerPod(udpServerPodName, data.testNamespace, serverPodPort, node1) - defer data.DeletePodAndWait(defaultTimeout, udpServerPodName, data.testNamespace) - require.NoError(t, err) - // test tcp server pod - err = data.createServerPodWithLabels(tcpServerPodName, data.testNamespace, serverPodPort, nil) + nonExistingPodName := "non-existing-pod" + + require.NoError(t, data.createToolboxPodOnNode(clientPodName, data.testNamespace, node1, false)) + defer data.DeletePodAndWait(defaultTimeout, clientPodName, data.testNamespace) + require.NoError(t, data.createServerPodWithLabels(tcpServerPodName, data.testNamespace, serverPodPort, nil)) defer data.DeletePodAndWait(defaultTimeout, tcpServerPodName, data.testNamespace) - require.NoError(t, err) - err = data.createToolboxPodOnNode(pcToolboxPodName, data.testNamespace, node1, false) - defer data.DeletePodAndWait(defaultTimeout, pcToolboxPodName, data.testNamespace) - require.NoError(t, err) - podIPs := waitForPodIPs(t, data, []PodInfo{ - {tcpServerPodName, getOSString(), "", data.testNamespace}, - {pcToolboxPodName, getOSString(), "", data.testNamespace}, + require.NoError(t, createUDPServerPod(udpServerPodName, data.testNamespace, serverPodPort, node1)) + defer data.DeletePodAndWait(defaultTimeout, udpServerPodName, data.testNamespace) + + waitForPodIPs(t, data, []PodInfo{ + {Name: clientPodName}, + {Name: tcpServerPodName}, + {Name: udpServerPodName}, }) - tcpServerPodIP := podIPs[tcpServerPodName].IPv4.String() testcases := []pcTestCase{ { - name: "timeout-case", + name: "ipv4-icmp-timeout", ipVersion: 4, - srcPod: pcToolboxPodName, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{ - Name: randName(fmt.Sprintf("%s-timeout-case-", data.testNamespace)), + Name: "ipv4-icmp-timeout", }, Spec: crdv1alpha1.PacketCaptureSpec{ - Timeout: &pcShortTimeout, + Timeout: ptr.To[int32](15), Source: crdv1alpha1.Source{ Pod: &crdv1alpha1.PodReference{ Namespace: data.testNamespace, - Name: pcToolboxPodName, + Name: clientPodName, }, }, Destination: crdv1alpha1.Destination{ - IP: &tcpServerPodIP, + Pod: &crdv1alpha1.PodReference{ + Namespace: data.testNamespace, + Name: udpServerPodName, + }, }, CaptureConfig: crdv1alpha1.CaptureConfig{ FirstN: &crdv1alpha1.PacketCaptureFirstNConfig{ @@ -214,46 +217,55 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { URL: fmt.Sprintf("sftp://%s:30010/upload", controlPlaneNodeIPv4()), }, Packet: &crdv1alpha1.Packet{ - Protocol: &tcpProto, + Protocol: &icmpProto, IPFamily: v1.IPv4Protocol, - TransportHeader: crdv1alpha1.TransportHeader{ - TCP: &crdv1alpha1.TCPHeader{ - DstPort: &testNonExistPort, - }, - }, }, }, }, expectedStatus: crdv1alpha1.PacketCaptureStatus{ + NumberCaptured: 10, + FilePath: fmt.Sprintf("sftp://%s:30010/upload/ipv4-icmp-timeout.pcapng", controlPlaneNodeIPv4()), Conditions: []crdv1alpha1.PacketCaptureCondition{ + { + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: metav1.Now(), + Reason: "Started", + }, { Type: crdv1alpha1.PacketCaptureComplete, Status: metav1.ConditionStatus(v1.ConditionTrue), LastTransitionTime: metav1.Now(), Reason: "Timeout", + Message: "context deadline exceeded", + }, + { + Type: crdv1alpha1.PacketCaptureFileUploaded, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: metav1.Now(), + Reason: "Succeed", }, }, }, }, { - name: nonExistPodName, + name: nonExistingPodName, ipVersion: 4, - srcPod: pcToolboxPodName, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{ - Name: randName(fmt.Sprintf("%s-%s-", data.testNamespace, nonExistPodName)), + Name: nonExistingPodName, }, Spec: crdv1alpha1.PacketCaptureSpec{ Source: crdv1alpha1.Source{ Pod: &crdv1alpha1.PodReference{ Namespace: data.testNamespace, - Name: pcToolboxPodName, + Name: clientPodName, }, }, Destination: crdv1alpha1.Destination{ Pod: &crdv1alpha1.PodReference{ Namespace: data.testNamespace, - Name: nonExistPodName, + Name: nonExistingPodName, }, }, CaptureConfig: crdv1alpha1.CaptureConfig{ @@ -268,12 +280,18 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { }, expectedStatus: crdv1alpha1.PacketCaptureStatus{ Conditions: []crdv1alpha1.PacketCaptureCondition{ + { + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: metav1.Now(), + Reason: "Started", + }, { Type: crdv1alpha1.PacketCaptureComplete, - Status: metav1.ConditionStatus(v1.ConditionFalse), + Status: metav1.ConditionStatus(v1.ConditionTrue), LastTransitionTime: metav1.Now(), - Reason: "CaptureFailed", - Message: fmt.Sprintf("failed to get Pod %s/%s: pods \"%s\" not found", data.testNamespace, nonExistPodName, nonExistPodName), + Reason: "Failed", + Message: fmt.Sprintf("failed to get Pod %s/%s: pods \"%s\" not found", data.testNamespace, nonExistingPodName, nonExistingPodName), }, }, }, @@ -281,16 +299,15 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { { name: "ipv4-tcp", ipVersion: 4, - srcPod: pcToolboxPodName, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{ - Name: randName(fmt.Sprintf("%s-ipv4-tcp-", data.testNamespace)), + Name: "ipv4-tcp", }, Spec: crdv1alpha1.PacketCaptureSpec{ Source: crdv1alpha1.Source{ Pod: &crdv1alpha1.PodReference{ Namespace: data.testNamespace, - Name: pcToolboxPodName, + Name: clientPodName, }, }, Destination: crdv1alpha1.Destination{ @@ -312,7 +329,7 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { IPFamily: v1.IPv4Protocol, TransportHeader: crdv1alpha1.TransportHeader{ TCP: &crdv1alpha1.TCPHeader{ - DstPort: &testServerPort, + DstPort: ptr.To(serverPodPort), }, }, }, @@ -320,7 +337,14 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { }, expectedStatus: crdv1alpha1.PacketCaptureStatus{ NumberCaptured: 5, + FilePath: fmt.Sprintf("sftp://%s:30010/upload/ipv4-tcp.pcapng", controlPlaneNodeIPv4()), Conditions: []crdv1alpha1.PacketCaptureCondition{ + { + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: metav1.Now(), + Reason: "Started", + }, { Type: crdv1alpha1.PacketCaptureComplete, Status: metav1.ConditionStatus(v1.ConditionTrue), @@ -339,16 +363,15 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { { name: "ipv4-udp", ipVersion: 4, - srcPod: pcToolboxPodName, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{ - Name: randName(fmt.Sprintf("%s-ipv4-udp-", data.testNamespace)), + Name: "ipv4-udp", }, Spec: crdv1alpha1.PacketCaptureSpec{ Source: crdv1alpha1.Source{ Pod: &crdv1alpha1.PodReference{ Namespace: data.testNamespace, - Name: pcToolboxPodName, + Name: clientPodName, }, }, Destination: crdv1alpha1.Destination{ @@ -370,7 +393,7 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { IPFamily: v1.IPv4Protocol, TransportHeader: crdv1alpha1.TransportHeader{ UDP: &crdv1alpha1.UDPHeader{ - DstPort: &testServerPort, + DstPort: ptr.To(serverPodPort), }, }, }, @@ -378,7 +401,14 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { }, expectedStatus: crdv1alpha1.PacketCaptureStatus{ NumberCaptured: 5, + FilePath: fmt.Sprintf("sftp://%s:30010/upload/ipv4-udp.pcapng", controlPlaneNodeIPv4()), Conditions: []crdv1alpha1.PacketCaptureCondition{ + { + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: metav1.Now(), + Reason: "Started", + }, { Type: crdv1alpha1.PacketCaptureComplete, Status: metav1.ConditionStatus(v1.ConditionTrue), @@ -397,16 +427,15 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { { name: "ipv4-icmp", ipVersion: 4, - srcPod: pcToolboxPodName, pc: &crdv1alpha1.PacketCapture{ ObjectMeta: metav1.ObjectMeta{ - Name: randName(fmt.Sprintf("%s-ipv4-icmp-", data.testNamespace)), + Name: "ipv4-icmp", }, Spec: crdv1alpha1.PacketCaptureSpec{ Source: crdv1alpha1.Source{ Pod: &crdv1alpha1.PodReference{ Namespace: data.testNamespace, - Name: pcToolboxPodName, + Name: clientPodName, }, }, Destination: crdv1alpha1.Destination{ @@ -431,7 +460,14 @@ func testPacketCaptureBasic(t *testing.T, data *TestData) { }, expectedStatus: crdv1alpha1.PacketCaptureStatus{ NumberCaptured: 5, + FilePath: fmt.Sprintf("sftp://%s:30010/upload/ipv4-icmp.pcapng", controlPlaneNodeIPv4()), Conditions: []crdv1alpha1.PacketCaptureCondition{ + { + Type: crdv1alpha1.PacketCaptureStarted, + Status: metav1.ConditionStatus(v1.ConditionTrue), + LastTransitionTime: metav1.Now(), + Reason: "Started", + }, { Type: crdv1alpha1.PacketCaptureComplete, Status: metav1.ConditionStatus(v1.ConditionTrue), @@ -464,29 +500,29 @@ func getOSString() string { } func runPacketCaptureTest(t *testing.T, data *TestData, tc pcTestCase) { - pcToolboxPodName := "toolbox" - icmpProto := intstr.FromString("ICMP") - udpProto := intstr.FromString("UDP") - tcpProto := intstr.FromString("TCP") - nonExistPodName := "non-existing-pod" switch tc.ipVersion { case 4: skipIfNotIPv4Cluster(t) case 6: skipIfNotIPv6Cluster(t) } - // wait for toolbox - waitForPodIPs(t, data, []PodInfo{{pcToolboxPodName, getOSString(), "", data.testNamespace}}) - dstPodName := "" - if tc.pc.Spec.Destination.Pod != nil { - dstPodName = tc.pc.Spec.Destination.Pod.Name - } var dstPodIPs *PodIPs - if dstPodName != nonExistPodName && dstPodName != "" { - // wait for pods to be ready first - podIPs := waitForPodIPs(t, data, []PodInfo{{dstPodName, getOSString(), "", data.testNamespace}}) - dstPodIPs = podIPs[dstPodName] + if tc.pc.Spec.Destination.IP != nil { + ip := net.ParseIP(*tc.pc.Spec.Destination.IP) + if ip.To4() != nil { + dstPodIPs = &PodIPs{IPv4: &ip} + } else { + dstPodIPs = &PodIPs{IPv6: &ip} + } + } else if tc.pc.Spec.Destination.Pod != nil { + pod, err := data.clientset.CoreV1().Pods(tc.pc.Spec.Destination.Pod.Namespace).Get(context.TODO(), tc.pc.Spec.Destination.Pod.Name, metav1.GetOptions{}) + if err != nil { + require.True(t, errors.IsNotFound(err)) + } else { + dstPodIPs, err = parsePodIPs(pod) + require.NoError(t, err) + } } if _, err := data.crdClient.CrdV1alpha1().PacketCaptures().Create(context.TODO(), tc.pc, metav1.CreateOptions{}); err != nil { @@ -498,24 +534,15 @@ func runPacketCaptureTest(t *testing.T, data *TestData, tc pcTestCase) { } }() - if !strings.Contains(tc.pc.Name, "non-exist") && !strings.Contains(tc.pc.Name, "timeout") { - srcPod := tc.srcPod - if dstIP := tc.pc.Spec.Destination.IP; dstIP != nil { - ip := net.ParseIP(*dstIP) - if ip.To4() != nil { - dstPodIPs = &PodIPs{IPv4: &ip} - } else { - dstPodIPs = &PodIPs{IPv6: &ip} - } - } - time.Sleep(time.Second * 2) + // The destination is unset or invalid, do not generate traffic as the test expects to fail. + if dstPodIPs != nil { + srcPod := tc.pc.Spec.Source.Pod.Name protocol := *tc.pc.Spec.Packet.Protocol server := dstPodIPs.IPv4.String() if tc.ipVersion == 6 { server = dstPodIPs.IPv6.String() } // wait for CR running. - _, err := data.waitForPacketCapture(t, tc.pc.Name, 0, isPacketCaptureRunning) if err != nil { t.Fatalf("Error: Waiting PacketCapture to Running failed: %v", err) @@ -528,13 +555,13 @@ func runPacketCaptureTest(t *testing.T, data *TestData, tc pcTestCase) { } } else if protocol == tcpProto { for i := 1; i <= 10; i++ { - if err := data.runNetcatCommandFromTestPodWithProtocol(tc.srcPod, data.testNamespace, toolboxContainerName, server, serverPodPort, "tcp"); err != nil { + if err := data.runNetcatCommandFromTestPodWithProtocol(srcPod, data.testNamespace, toolboxContainerName, server, serverPodPort, "tcp"); err != nil { t.Logf("Netcat(TCP) '%s' -> '%v' failed: ERROR (%v)", srcPod, server, err) } } } else if protocol == udpProto { for i := 1; i <= 10; i++ { - if err := data.runNetcatCommandFromTestPodWithProtocol(tc.srcPod, data.testNamespace, toolboxContainerName, server, serverPodPort, "udp"); err != nil { + if err := data.runNetcatCommandFromTestPodWithProtocol(srcPod, data.testNamespace, toolboxContainerName, server, serverPodPort, "udp"); err != nil { t.Logf("Netcat(UDP) '%s' -> '%v' failed: ERROR (%v)", srcPod, server, err) } } @@ -543,34 +570,19 @@ func runPacketCaptureTest(t *testing.T, data *TestData, tc pcTestCase) { timeout := tc.pc.Spec.Timeout if timeout == nil { - tv := int32(15) - timeout = &tv + // It may take some time to upload. + timeout = ptr.To[int32](15) } if strings.Contains(tc.name, "timeout") { // wait more for status update. - tv := *timeout + int32(10) - timeout = &tv + timeout = ptr.To[int32](*timeout + 5) } - pc, err := data.waitForPacketCapture(t, tc.pc.Name, int(*timeout), isPacketCaptureReady) + pc, err := data.waitForPacketCapture(t, tc.pc.Name, int(*timeout), isPacketCaptureComplete) if err != nil { t.Fatalf("Error: Get PacketCapture failed: %v", err) } - tc.expectedStatus.FilePath = pc.Status.FilePath - if strings.Contains(tc.name, "timeout") { - // if can be 0 or less thant target number. - tc.expectedStatus.NumberCaptured = pc.Status.NumberCaptured - } - // remove pending condition as it's random - newCond := []crdv1alpha1.PacketCaptureCondition{} - for _, cond := range pc.Status.Conditions { - if cond.Type == crdv1alpha1.PacketCaptureStarted { - continue - } - newCond = append(newCond, cond) - } - pc.Status.Conditions = newCond if !packetCaptureStatusEqual(pc.Status, tc.expectedStatus) { t.Errorf("CR status not match, actual: %+v, expected: %+v", pc.Status, tc.expectedStatus) } @@ -602,12 +614,9 @@ func (data *TestData) waitForPacketCapture(t *testing.T, name string, specTimeou return pc, nil } -func isPacketCaptureReady(pc *crdv1alpha1.PacketCapture) bool { - if len(pc.Status.Conditions) == 0 { - return false - } +func isPacketCaptureComplete(pc *crdv1alpha1.PacketCapture) bool { for _, cond := range pc.Status.Conditions { - if cond.Type == crdv1alpha1.PacketCaptureComplete { + if cond.Type == crdv1alpha1.PacketCaptureComplete && cond.Status == metav1.ConditionTrue { return true } } @@ -616,9 +625,6 @@ func isPacketCaptureReady(pc *crdv1alpha1.PacketCapture) bool { } func isPacketCaptureRunning(pc *crdv1alpha1.PacketCapture) bool { - if len(pc.Status.Conditions) == 0 { - return false - } for _, cond := range pc.Status.Conditions { if cond.Type == crdv1alpha1.PacketCaptureStarted && cond.Status == metav1.ConditionTrue { return true