diff --git a/cmd/flux/logs.go b/cmd/flux/logs.go index d8c27ff274..ba1a981d39 100644 --- a/cmd/flux/logs.go +++ b/cmd/flux/logs.go @@ -20,6 +20,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error { return fmt.Errorf("no argument required") } - pods, err := getPods(ctx, clientset, fluxSelector) + pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector) if err != nil { return err } @@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error { return podLogs(ctx, requests) } -func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) { +// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods +// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running +// state). If no pod is found, an error is returned. +func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) { var ret []corev1.Pod opts := metav1.ListOptions{ LabelSelector: label, } - deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts) + deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts) if err != nil { return ret, err } @@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core opts := metav1.ListOptions{ LabelSelector: createLabelStringFromMap(label), } - podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts) + podList, err := c.CoreV1().Pods(ns).List(ctx, opts) if err != nil { return ret, err } @@ -196,11 +200,16 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core } } + if len(ret) == 0 { + return nil, fmt.Errorf("no Flux pods found in namespace %q", ns) + } + return ret, nil } func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error { reader, writer := io.Pipe() + errReader, errWriter := io.Pipe() wg := &sync.WaitGroup{} wg.Add(len(requests)) @@ -208,7 +217,7 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func(req rest.ResponseWrapper) { defer wg.Done() if err := logRequest(ctx, req, writer); err != nil { - writer.CloseWithError(err) + fmt.Fprintf(errWriter, "failed getting logs: %s\n", err) return } }(request) @@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func() { wg.Wait() writer.Close() + errWriter.Close() }() - _, err := io.Copy(os.Stdout, reader) - return err + stdoutErrCh := asyncCopy(os.Stdout, reader) + stderrErrCh := asyncCopy(os.Stderr, errReader) + + return errors.Join(<-stdoutErrCh, <-stderrErrCh) +} + +// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value. +// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent +// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns. +// This function lets you copy from multiple sources into multiple destinations in parallel. +func asyncCopy(dst io.Writer, src io.Reader) <-chan error { + errCh := make(chan error) + go func(errCh chan error) { + _, err := io.Copy(dst, src) + errCh <- err + }(errCh) + + return errCh } func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error { + var retErr error for _, req := range requests { if err := logRequest(ctx, req, os.Stdout); err != nil { - return err + fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err) + retErr = fmt.Errorf("failed to collect logs from all Flux pods") + continue } } - return nil + return retErr } func createLabelStringFromMap(m map[string]string) string {