diff --git a/cmd/flux/logs.go b/cmd/flux/logs.go index d8c27ff274..981e3b679c 100644 --- a/cmd/flux/logs.go +++ b/cmd/flux/logs.go @@ -20,6 +20,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -201,6 +202,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error { reader, writer := io.Pipe() + errReader, errWriter := io.Pipe() wg := &sync.WaitGroup{} wg.Add(len(requests)) @@ -208,7 +210,7 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func(req rest.ResponseWrapper) { defer wg.Done() if err := logRequest(ctx, req, writer); err != nil { - writer.CloseWithError(err) + fmt.Fprintf(errWriter, "failed getting logs: %s\n", err) return } }(request) @@ -217,16 +219,34 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func() { wg.Wait() writer.Close() + errWriter.Close() }() - _, err := io.Copy(os.Stdout, reader) - return err + return errors.Join( + <-asyncCopy(os.Stdout, reader), + <-asyncCopy(os.Stderr, errReader), + ) +} + +// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value. +// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent +// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns. +// This function lets you copy from multiple sources into multiple destinations in parallel. +func asyncCopy(dst io.Writer, src io.Reader) <-chan error { + errCh := make(chan error) + go func(errCh chan error) { + _, err := io.Copy(dst, src) + errCh <- err + }(errCh) + + return errCh } func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error { for _, req := range requests { if err := logRequest(ctx, req, os.Stdout); err != nil { - return err + fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err) + continue } }