Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix Windows blocking on pipe close #4400

Merged
merged 2 commits into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (e *UniversalExecutor) configureLoggers() error {
return fmt.Errorf("error creating new stdout log file for %q: %v", e.ctx.Task.Name, err)
}

r, err := NewLogRotatorWrapper(lro)
r, err := newLogRotatorWrapper(e.logger, lro)
if err != nil {
return err
}
Expand All @@ -378,7 +378,7 @@ func (e *UniversalExecutor) configureLoggers() error {
return fmt.Errorf("error creating new stderr log file for %q: %v", e.ctx.Task.Name, err)
}

r, err := NewLogRotatorWrapper(lre)
r, err := newLogRotatorWrapper(e.logger, lre)
if err != nil {
return err
}
Expand Down Expand Up @@ -851,11 +851,12 @@ type logRotatorWrapper struct {
processOutReader *os.File
rotatorWriter *logging.FileRotator
hasFinishedCopied chan struct{}
logger *log.Logger
}

// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the
// newLogRotatorWrapper takes a rotator and returns a wrapper that has the
// processOutWriter to attach to the processes stdout or stderr.
func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, error) {
func newLogRotatorWrapper(logger *log.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err)
Expand All @@ -865,7 +866,8 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}, 1),
hasFinishedCopied: make(chan struct{}),
logger: logger,
}
wrap.start()
return wrap, nil
Expand All @@ -875,22 +877,51 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start() {
go func() {
io.Copy(l.rotatorWriter, l.processOutReader)
l.processOutReader.Close() // in case io.Copy stopped due to write error
close(l.hasFinishedCopied)
defer close(l.hasFinishedCopied)
_, err := io.Copy(l.rotatorWriter, l.processOutReader)
if err != nil {
// Close reader to propagate io error across pipe.
// Note that this may block until the process exits on
// Windows due to
// https://github.com/PowerShell/PowerShell/issues/4254
// or similar issues. Since this is already running in
// a goroutine its safe to block until the process is
// force-killed.
l.processOutReader.Close()
}
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() error {
func (l *logRotatorWrapper) Close() {
// Wait up to the close tolerance before we force close
select {
case <-l.hasFinishedCopied:
case <-time.After(processOutputCloseTolerance):
}
err := l.processOutReader.Close()

// Closing the read side of a pipe may block on Windows if the process
Copy link
Contributor

@preetapan preetapan Jun 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added this close for processOutreader in 4150296 to fix this failing test TestExecutor_Start_Wait. Removing this entirely doesn't affect that test (I tested with -count 50 locally). Wondering if we can remove that l.processOutReader.Close() line 893 entirely. That would simplify all this even more.

That test was failing because without the channel blocked wait on hasFinishedCopied, a very short lived command would never get its standard output read and stored in the log file. Now that we added a wait on that channel and a grace period of 2 seconds, I don't see why we need to call close on the processOutreader again in line 893.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of line 893 (now 890) is that it is to propagate errors from the io.Copy destination (rotator) back to the source (processOutReader).

So if io.Copy fails to write, it needs to signal to processOutReader that i will never be read again by Close()ing it.

I think this will handle cases like running-out-of-disk where we can no longer write anything, so we signal that to the process by closing its output (and likely causing the process to crash).

// is being debugged as in:
// https://github.com/PowerShell/PowerShell/issues/4254
// The pipe will be closed and cleaned up when the process exits.
closeDone := make(chan struct{})
go func() {
defer close(closeDone)
err := l.processOutReader.Close()
if err != nil && !strings.Contains(err.Error(), "file already closed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logging meant to be temporary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not? I don't think there's any way to know when it would be safe to remove.

The contains check is just to prevent spamming the logs since we close the pipe multiple times. We could probably try to fix that, but I'm not sure it's worth the effort as there's no harm in multiple Closes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What value is logging the error giving if we don't change the outcome in the call site anyway by ignoring the error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly "file already closed" is the only error I've ever seen returned from Close(), so I don't expect to ever see this code hit.

I put it in in case it helped debug future issues similar to this. I have never seen anything like this behavior before (eg the blocking on Close), so I have little idea where extra logging might be helpful in the future or just noise.

l.logger.Printf("[WARN] executor: error closing read-side of process output pipe: %v", err)
}

}()

select {
case <-closeDone:
case <-time.After(processOutputCloseTolerance):
l.logger.Printf("[WARN] executor: timed out waiting for read-side of process output pipe to close")
}

l.rotatorWriter.Close()
return err
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error was never being checked, so there's no harm in elliding it in favor of logging errors directly from this method.

return
}
2 changes: 1 addition & 1 deletion client/driver/executor/executor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (e *UniversalExecutor) shutdownProcess(proc *os.Process) error {
if err := sendCtrlBreak(proc.Pid); err != nil {
return fmt.Errorf("executor.shutdown error: %v", err)
}
e.logger.Printf("Sent Ctrl-Break to process %v", proc.Pid)
e.logger.Printf("[INFO] executor: sent Ctrl-Break to process %v", proc.Pid)

return nil
}