diff --git a/client/logmon/logmon.go b/client/logmon/logmon.go index f230b3b080a..f5ffc541bd3 100644 --- a/client/logmon/logmon.go +++ b/client/logmon/logmon.go @@ -1,6 +1,7 @@ package logmon import ( + "context" "fmt" "io" "os" @@ -177,6 +178,8 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { // log rotator data. The processOutWriter should be attached to the process and // data will be copied from the reader to the rotator. type logRotatorWrapper struct { + retryCtx context.Context + retryCancel context.CancelFunc fifoPath string rotatorWriter io.WriteCloser hasFinishedCopied chan struct{} @@ -217,8 +220,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos logger.Error("failed to create FIFO", "stat_error", serr, "create_err", err) return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err) } + ctx, cancel := context.WithCancel(context.Background()) wrap := &logRotatorWrapper{ + retryCtx: ctx, + retryCancel: cancel, fifoPath: path, rotatorWriter: rotator, hasFinishedCopied: make(chan struct{}), @@ -235,6 +241,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) { go func() { defer close(l.hasFinishedCopied) + RETRY_COPY: + if l.retryCtx.Err() != nil { + l.logger.Warn("fifo context cancelled, exiting") + return + } reader, err := openFn() if err != nil { @@ -256,6 +267,21 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) { // force-killed. reader.Close() } + // Close reader + reader.Close() + + // Check if the context has been closed + // return if it has, otherwise wait before + // restarting copy + select { + case <-l.retryCtx.Done(): + return + case <-time.After(1 * time.Second): + } + + // reset openCompleted + l.openCompleted = make(chan struct{}) + goto RETRY_COPY }() return } @@ -263,6 +289,9 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) { // Close closes the rotator and the process writer to ensure that the Wait // command exits. func (l *logRotatorWrapper) Close() { + // Cancel the retryable context + l.retryCancel() + // Wait up to the close tolerance before we force close select { case <-l.hasFinishedCopied: diff --git a/client/logmon/logmon_test.go b/client/logmon/logmon_test.go index 8f0fa8f1c7c..91f5128d491 100644 --- a/client/logmon/logmon_test.go +++ b/client/logmon/logmon_test.go @@ -131,16 +131,10 @@ func TestLogmon_Start_restart_flusheslogs(t *testing.T) { }) require.True(impl.tl.IsRunning()) - // Close stdout and assert that logmon no longer writes to the file + // Close stdout and stderr require.NoError(stdout.Close()) require.NoError(stderr.Close()) - testutil.WaitForResult(func() (bool, error) { - return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running") - }, func(err error) { - require.NoError(err) - }) - stdout, err = fifo.OpenWriter(stdoutFifoPath) require.NoError(err) stderr, err = fifo.OpenWriter(stderrFifoPath) @@ -234,16 +228,10 @@ func TestLogmon_Start_restart(t *testing.T) { }) require.True(impl.tl.IsRunning()) - // Close stdout and assert that logmon no longer writes to the file + // Close stdout and assert that logmon no longer writes to the fileA require.NoError(stdout.Close()) require.NoError(stderr.Close()) - testutil.WaitForResult(func() (bool, error) { - return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running") - }, func(err error) { - require.NoError(err) - }) - // Start logmon again and assert that it can receive logs again require.NoError(lm.Start(cfg)) @@ -267,6 +255,74 @@ func TestLogmon_Start_restart(t *testing.T) { }) } +// asserts that start goroutine properly exists when ctx is cancelled +func TestLogmon_Start_restart_Close(t *testing.T) { + require := require.New(t) + var stdoutFifoPath, stderrFifoPath string + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(err) + defer os.RemoveAll(dir) + + if runtime.GOOS == "windows" { + stdoutFifoPath = "//./pipe/test-restart.stdout" + stderrFifoPath = "//./pipe/test-restart.stderr" + } else { + stdoutFifoPath = filepath.Join(dir, "stdout.fifo") + stderrFifoPath = filepath.Join(dir, "stderr.fifo") + } + + cfg := &LogConfig{ + LogDir: dir, + StdoutLogFile: "stdout", + StdoutFifo: stdoutFifoPath, + StderrLogFile: "stderr", + StderrFifo: stderrFifoPath, + MaxFiles: 2, + MaxFileSizeMB: 1, + } + + lm := NewLogMon(testlog.HCLogger(t)) + impl, ok := lm.(*logmonImpl) + require.True(ok) + require.NoError(lm.Start(cfg)) + + stdout, err := fifo.OpenWriter(stdoutFifoPath) + require.NoError(err) + stderr, err := fifo.OpenWriter(stderrFifoPath) + require.NoError(err) + + // Write a string and assert it was written to the file + _, err = stdout.Write([]byte("test\n")) + require.NoError(err) + + testutil.WaitForResult(func() (bool, error) { + raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0")) + if err != nil { + return false, err + } + return "test\n" == string(raw), fmt.Errorf("unexpected stdout %q", string(raw)) + }, func(err error) { + require.NoError(err) + }) + require.True(impl.tl.IsRunning()) + + // Close stdout and assert that logmon no longer writes to the fileA + require.NoError(stdout.Close()) + require.NoError(stderr.Close()) + + // Close the task logger + impl.tl.Close() + + // Ensure that the task logger is no longer running + testutil.WaitForResult(func() (bool, error) { + return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running") + }, func(err error) { + require.NoError(err) + }) + +} + // panicWriter panics on use type panicWriter struct{}