From 7ece3a7d77684ea0d1b897c52b953720c5b11fe6 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 24 May 2018 15:54:28 -0700 Subject: [PATCH 1/2] Force closing of pipe to child process --- client/driver/executor/executor.go | 85 +++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 6b18612f8c8..ba1a72c9fcd 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -162,8 +162,8 @@ type UniversalExecutor struct { processExited chan interface{} fsIsolationEnforced bool - lre *logging.FileRotator - lro *logging.FileRotator + lre *logRotatorWrapper + lro *logRotatorWrapper rotatorLock sync.Mutex syslogServer *logging.SyslogServer @@ -252,8 +252,8 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro if err := e.configureLoggers(); err != nil { return nil, err } - e.cmd.Stdout = e.lro - e.cmd.Stderr = e.lre + e.cmd.Stdout = e.lro.processOutWriter + e.cmd.Stderr = e.lre.processOutWriter // Look up the binary path and make it executable absPath, err := e.lookupBin(e.ctx.TaskEnv.ReplaceEnv(command.Cmd)) @@ -348,7 +348,18 @@ func (e *UniversalExecutor) configureLoggers() error { if err != nil { return fmt.Errorf("error creating new stdout log file for %q: %v", e.ctx.Task.Name, err) } - e.lro = lro + + r, w, err := os.Pipe() + if err != nil { + return fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err) + } + + e.lro = &logRotatorWrapper{ + processOutWriter: w, + processOutReader: r, + rotatorWriter: lro, + } + e.lro.Start() } if e.lre == nil { @@ -357,7 +368,18 @@ func (e *UniversalExecutor) configureLoggers() error { if err != nil { return fmt.Errorf("error creating new stderr log file for %q: %v", e.ctx.Task.Name, err) } - e.lre = lre + + r, w, err := os.Pipe() + if err != nil { + return fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err) + } + + e.lre = &logRotatorWrapper{ + processOutWriter: w, + processOutReader: r, + rotatorWriter: lre, + } + e.lre.Start() } return nil } @@ -375,14 +397,14 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error if e.lro == nil { return fmt.Errorf("log rotator for stdout doesn't exist") } - e.lro.MaxFiles = logConfig.MaxFiles - e.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + e.lro.rotatorWriter.MaxFiles = logConfig.MaxFiles + e.lro.rotatorWriter.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) if e.lre == nil { return fmt.Errorf("log rotator for stderr doesn't exist") } - e.lre.MaxFiles = logConfig.MaxFiles - e.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) + e.lre.rotatorWriter.MaxFiles = logConfig.MaxFiles + e.lre.rotatorWriter.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024) return nil } @@ -393,10 +415,10 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { e.rotatorLock.Lock() if e.lro != nil && e.lre != nil { fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024) - e.lro.MaxFiles = task.LogConfig.MaxFiles - e.lro.FileSize = fileSize - e.lre.MaxFiles = task.LogConfig.MaxFiles - e.lre.FileSize = fileSize + e.lro.rotatorWriter.MaxFiles = task.LogConfig.MaxFiles + e.lro.rotatorWriter.FileSize = fileSize + e.lre.rotatorWriter.MaxFiles = task.LogConfig.MaxFiles + e.lre.rotatorWriter.FileSize = fileSize } e.rotatorLock.Unlock() return nil @@ -799,7 +821,7 @@ func (e *UniversalExecutor) LaunchSyslogServer() (*SyslogServerState, error) { e.syslogServer = logging.NewSyslogServer(l, e.syslogChan, e.logger) go e.syslogServer.Start() - go e.collectLogs(e.lre, e.lro) + go e.collectLogs(e.lre.rotatorWriter, e.lro.rotatorWriter) syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String()) return &SyslogServerState{Addr: syslogAddr}, nil } @@ -809,11 +831,36 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) { // If the severity of the log line is err then we write to stderr // otherwise all messages go to stdout if logParts.Severity == syslog.LOG_ERR { - e.lre.Write(logParts.Message) - e.lre.Write([]byte{'\n'}) + we.Write(logParts.Message) + we.Write([]byte{'\n'}) } else { - e.lro.Write(logParts.Message) - e.lro.Write([]byte{'\n'}) + wo.Write(logParts.Message) + wo.Write([]byte{'\n'}) } } } + +// logRotatorWrapper wraps our log rotator and exposes a pipe that can feed the +// log rotator data. The processOutWriter should be attached to the process and +// Start is called to copy data from the reader to the rotator. +type logRotatorWrapper struct { + processOutWriter *os.File + processOutReader *os.File + rotatorWriter *logging.FileRotator +} + +// Start starts a go-routine that copies from the pipe into the rotator. +func (l *logRotatorWrapper) Start() { + go func() { + io.Copy(l.rotatorWriter, l.processOutReader) + l.processOutReader.Close() // in case io.Copy stopped due to write error + }() + return +} + +// Close closes the rotator and the process writer to ensure that the Wait +// command exits. +func (l *logRotatorWrapper) Close() error { + l.rotatorWriter.Close() + return l.processOutWriter.Close() +} From 52bed806fca8954ef3cec0aa27a9e468299cbc83 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 24 May 2018 16:25:20 -0700 Subject: [PATCH 2/2] cleanup --- client/driver/executor/executor.go | 48 +++++++++++++++++------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index ba1a72c9fcd..7ddccce69f0 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -349,17 +349,11 @@ func (e *UniversalExecutor) configureLoggers() error { return fmt.Errorf("error creating new stdout log file for %q: %v", e.ctx.Task.Name, err) } - r, w, err := os.Pipe() + r, err := NewLogRotatorWrapper(lro) if err != nil { - return fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err) + return err } - - e.lro = &logRotatorWrapper{ - processOutWriter: w, - processOutReader: r, - rotatorWriter: lro, - } - e.lro.Start() + e.lro = r } if e.lre == nil { @@ -369,17 +363,11 @@ func (e *UniversalExecutor) configureLoggers() error { return fmt.Errorf("error creating new stderr log file for %q: %v", e.ctx.Task.Name, err) } - r, w, err := os.Pipe() + r, err := NewLogRotatorWrapper(lre) if err != nil { - return fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err) + return err } - - e.lre = &logRotatorWrapper{ - processOutWriter: w, - processOutReader: r, - rotatorWriter: lre, - } - e.lre.Start() + e.lre = r } return nil } @@ -842,15 +830,33 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) { // logRotatorWrapper wraps our log rotator and exposes a pipe that can feed the // log rotator data. The processOutWriter should be attached to the process and -// Start is called to copy data from the reader to the rotator. +// data will be copied from the reader to the rotator. type logRotatorWrapper struct { processOutWriter *os.File processOutReader *os.File rotatorWriter *logging.FileRotator } -// Start starts a go-routine that copies from the pipe into the rotator. -func (l *logRotatorWrapper) Start() { +// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the +// processOutWriter to attach to the processes stdout or stderr. +func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err) + } + + wrap := &logRotatorWrapper{ + processOutWriter: w, + processOutReader: r, + rotatorWriter: rotator, + } + wrap.start() + return wrap, nil +} + +// start starts a go-routine that copies from the pipe into the rotator. This is +// called by the constructor and not the user of the wrapper. +func (l *logRotatorWrapper) start() { go func() { io.Copy(l.rotatorWriter, l.processOutReader) l.processOutReader.Close() // in case io.Copy stopped due to write error