Skip to content

Commit

Permalink
Merge pull request #4336 from hashicorp/b-blocking
Browse files Browse the repository at this point in the history
Force closing of pipe to child process
  • Loading branch information
dadgar authored May 29, 2018
2 parents 20bc6e4 + 52bed80 commit d633ce3
Showing 1 changed file with 72 additions and 19 deletions.
91 changes: 72 additions & 19 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type UniversalExecutor struct {
processExited chan interface{}
fsIsolationEnforced bool

lre *logging.FileRotator
lro *logging.FileRotator
lre *logRotatorWrapper
lro *logRotatorWrapper
rotatorLock sync.Mutex

syslogServer *logging.SyslogServer
Expand Down Expand Up @@ -252,8 +252,8 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
if err := e.configureLoggers(); err != nil {
return nil, err
}
e.cmd.Stdout = e.lro
e.cmd.Stderr = e.lre
e.cmd.Stdout = e.lro.processOutWriter
e.cmd.Stderr = e.lre.processOutWriter

// Look up the binary path and make it executable
absPath, err := e.lookupBin(e.ctx.TaskEnv.ReplaceEnv(command.Cmd))
Expand Down Expand Up @@ -348,7 +348,12 @@ func (e *UniversalExecutor) configureLoggers() error {
if err != nil {
return fmt.Errorf("error creating new stdout log file for %q: %v", e.ctx.Task.Name, err)
}
e.lro = lro

r, err := NewLogRotatorWrapper(lro)
if err != nil {
return err
}
e.lro = r
}

if e.lre == nil {
Expand All @@ -357,7 +362,12 @@ func (e *UniversalExecutor) configureLoggers() error {
if err != nil {
return fmt.Errorf("error creating new stderr log file for %q: %v", e.ctx.Task.Name, err)
}
e.lre = lre

r, err := NewLogRotatorWrapper(lre)
if err != nil {
return err
}
e.lre = r
}
return nil
}
Expand All @@ -375,14 +385,14 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error
if e.lro == nil {
return fmt.Errorf("log rotator for stdout doesn't exist")
}
e.lro.MaxFiles = logConfig.MaxFiles
e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
e.lro.rotatorWriter.MaxFiles = logConfig.MaxFiles
e.lro.rotatorWriter.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)

if e.lre == nil {
return fmt.Errorf("log rotator for stderr doesn't exist")
}
e.lre.MaxFiles = logConfig.MaxFiles
e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
e.lre.rotatorWriter.MaxFiles = logConfig.MaxFiles
e.lre.rotatorWriter.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
return nil
}

Expand All @@ -393,10 +403,10 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
e.rotatorLock.Lock()
if e.lro != nil && e.lre != nil {
fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024)
e.lro.MaxFiles = task.LogConfig.MaxFiles
e.lro.FileSize = fileSize
e.lre.MaxFiles = task.LogConfig.MaxFiles
e.lre.FileSize = fileSize
e.lro.rotatorWriter.MaxFiles = task.LogConfig.MaxFiles
e.lro.rotatorWriter.FileSize = fileSize
e.lre.rotatorWriter.MaxFiles = task.LogConfig.MaxFiles
e.lre.rotatorWriter.FileSize = fileSize
}
e.rotatorLock.Unlock()
return nil
Expand Down Expand Up @@ -799,7 +809,7 @@ func (e *UniversalExecutor) LaunchSyslogServer() (*SyslogServerState, error) {

e.syslogServer = logging.NewSyslogServer(l, e.syslogChan, e.logger)
go e.syslogServer.Start()
go e.collectLogs(e.lre, e.lro)
go e.collectLogs(e.lre.rotatorWriter, e.lro.rotatorWriter)
syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String())
return &SyslogServerState{Addr: syslogAddr}, nil
}
Expand All @@ -809,11 +819,54 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
if logParts.Severity == syslog.LOG_ERR {
e.lre.Write(logParts.Message)
e.lre.Write([]byte{'\n'})
we.Write(logParts.Message)
we.Write([]byte{'\n'})
} else {
e.lro.Write(logParts.Message)
e.lro.Write([]byte{'\n'})
wo.Write(logParts.Message)
wo.Write([]byte{'\n'})
}
}
}

// logRotatorWrapper wraps our log rotator and exposes a pipe that can feed the
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
}

// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the
// processOutWriter to attach to the processes stdout or stderr.
func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err)
}

wrap := &logRotatorWrapper{
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
}
wrap.start()
return wrap, nil
}

// start starts a go-routine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start() {
go func() {
io.Copy(l.rotatorWriter, l.processOutReader)
l.processOutReader.Close() // in case io.Copy stopped due to write error
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() error {
l.rotatorWriter.Close()
return l.processOutWriter.Close()
}

0 comments on commit d633ce3

Please sign in to comment.