From bf38b8480cad903b46c7b6acc36816426c1eb216 Mon Sep 17 00:00:00 2001 From: ahrav Date: Tue, 15 Oct 2024 12:11:45 -0700 Subject: [PATCH] [fix] - resource leak (#3402) * fix resource leak * add comment * use errors.Join * address error wrapping --- pkg/handlers/handlers.go | 9 ++---- pkg/sources/git/git.go | 59 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index 8f1f5e106bef..1561d826d0d5 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -285,14 +285,9 @@ func HandleFile( } defer func() { // Ensure all data is read to prevent broken pipe. - _, copyErr := io.Copy(io.Discard, rdr) - if copyErr != nil { - err = fmt.Errorf("error discarding remaining data: %w", copyErr) - } - closeErr := rdr.Close() - if closeErr != nil { + if closeErr := rdr.Close(); closeErr != nil { if err != nil { - err = fmt.Errorf("%v; error closing reader: %w", err, closeErr) + err = errors.Join(err, closeErr) } else { err = fmt.Errorf("error closing reader: %w", closeErr) } diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index c6d35c436e45..c18c93879a9c 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -1213,7 +1213,14 @@ func getSafeRemoteURL(repo *git.Repository, preferred string) string { return safeURL } -func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources.ChunkReporter, chunkSkel *sources.Chunk, commitHash plumbing.Hash, path string) error { +func (s *Git) handleBinary( + ctx context.Context, + gitDir string, + reporter sources.ChunkReporter, + chunkSkel *sources.Chunk, + commitHash plumbing.Hash, + path string, +) (err error) { fileCtx := context.WithValues(ctx, "commit", commitHash.String()[:7], "path", path) fileCtx.Logger().V(5).Info("handling binary file") @@ -1227,9 +1234,32 @@ func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources. return nil } - cmd := exec.Command("git", "-C", gitDir, "cat-file", "blob", commitHash.String()+":"+path) + const ( + cmdTimeout = 60 * time.Second + waitDelay = 5 * time.Second + ) + // NOTE: This kludge ensures the context timeout for the 'git cat-file' command + // matches the timeout for the HandleFile operation. + // By setting both timeouts to the same value, we can be more confident + // that both operations will run for the same duration. + // The command execution includes a small Wait delay before terminating the process, + // giving HandleFile time to respect the context + // and return before the process is forcibly killed. + // This approach helps prevent premature termination and allows for more complete processing. + + // TODO: Develop a more robust mechanism to ensure consistent timeout behavior between the command execution + // and the HandleFile operation. This should prevent premature termination and allow for complete processing. + handlers.SetArchiveMaxTimeout(cmdTimeout) + + // Create a timeout context for the 'git cat-file' command to ensure it does not run indefinitely. + // This prevents potential resource exhaustion by terminating the command if it exceeds the specified duration. + catFileCtx, cancel := context.WithTimeoutCause(fileCtx, cmdTimeout, errors.New("git cat-file timeout")) + defer cancel() + + cmd := exec.CommandContext(catFileCtx, "git", "-C", gitDir, "cat-file", "blob", commitHash.String()+":"+path) var stderr bytes.Buffer cmd.Stderr = &stderr + cmd.WaitDelay = waitDelay // give the command a chance to finish before the timeout :) stdout, err := cmd.StdoutPipe() if err != nil { @@ -1240,9 +1270,30 @@ func (s *Git) handleBinary(ctx context.Context, gitDir string, reporter sources. return fmt.Errorf("error starting git cat-file: %w\n%s", err, stderr.Bytes()) } - defer func() { _ = cmd.Wait() }() + // Ensure all data from the reader (stdout) is consumed to prevent broken pipe errors. + // This operation discards any remaining data after HandleFile completion. + // If the reader is fully consumed, the copy is essentially a no-op. + // If an error occurs while discarding, it will be logged and combined with any existing error. + // The command's completion is then awaited and any execution errors are handled. + defer func() { + n, copyErr := io.Copy(io.Discard, stdout) + if copyErr != nil { + ctx.Logger().Error( + copyErr, + "Failed to discard remaining stdout data after HandleFile completion", + ) + } + + ctx.Logger().V(3).Info( + "HandleFile did not consume all stdout data; excess discarded", + "bytes_discarded", n) + + // Wait for the command to finish and handle any errors. + waitErr := cmd.Wait() + err = errors.Join(err, copyErr, waitErr) + }() - return handlers.HandleFile(fileCtx, stdout, chunkSkel, reporter, handlers.WithSkipArchives(s.skipArchives)) + return handlers.HandleFile(catFileCtx, stdout, chunkSkel, reporter, handlers.WithSkipArchives(s.skipArchives)) } func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) error {