Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry copy if copy receives EOF but task is still running #8149

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions client/logmon/logmon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logmon

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -177,6 +178,8 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
retryCtx context.Context
retryCancel context.CancelFunc
fifoPath string
rotatorWriter io.WriteCloser
hasFinishedCopied chan struct{}
Expand Down Expand Up @@ -217,8 +220,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos
logger.Error("failed to create FIFO", "stat_error", serr, "create_err", err)
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())

wrap := &logRotatorWrapper{
retryCtx: ctx,
retryCancel: cancel,
fifoPath: path,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}),
Expand All @@ -235,6 +241,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos
func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
go func() {
defer close(l.hasFinishedCopied)
RETRY_COPY:
if l.retryCtx.Err() != nil {
l.logger.Warn("fifo context cancelled, exiting")
return
}

reader, err := openFn()
if err != nil {
Expand All @@ -256,13 +267,31 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
// force-killed.
reader.Close()
}
// Close reader
reader.Close()

// Check if the context has been closed
// return if it has, otherwise wait before
// restarting copy
select {
case <-l.retryCtx.Done():
return
case <-time.After(1 * time.Second):
}

// reset openCompleted
l.openCompleted = make(chan struct{})
goto RETRY_COPY
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() {
// Cancel the retryable context
l.retryCancel()

// Wait up to the close tolerance before we force close
select {
case <-l.hasFinishedCopied:
Expand Down
84 changes: 70 additions & 14 deletions client/logmon/logmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,10 @@ func TestLogmon_Start_restart_flusheslogs(t *testing.T) {
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the file
// Close stdout and stderr
require.NoError(stdout.Close())
require.NoError(stderr.Close())

testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

stdout, err = fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err = fifo.OpenWriter(stderrFifoPath)
Expand Down Expand Up @@ -234,16 +228,10 @@ func TestLogmon_Start_restart(t *testing.T) {
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the file
// Close stdout and assert that logmon no longer writes to the fileA
require.NoError(stdout.Close())
require.NoError(stderr.Close())

testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

// Start logmon again and assert that it can receive logs again
require.NoError(lm.Start(cfg))

Expand All @@ -267,6 +255,74 @@ func TestLogmon_Start_restart(t *testing.T) {
})
}

// asserts that start goroutine properly exists when ctx is cancelled
func TestLogmon_Start_restart_Close(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
}

lm := NewLogMon(testlog.HCLogger(t))
impl, ok := lm.(*logmonImpl)
require.True(ok)
require.NoError(lm.Start(cfg))

stdout, err := fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err := fifo.OpenWriter(stderrFifoPath)
require.NoError(err)

// Write a string and assert it was written to the file
_, err = stdout.Write([]byte("test\n"))
require.NoError(err)

testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}
return "test\n" == string(raw), fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the fileA
require.NoError(stdout.Close())
require.NoError(stderr.Close())

// Close the task logger
impl.tl.Close()

// Ensure that the task logger is no longer running
testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

}

// panicWriter panics on use
type panicWriter struct{}

Expand Down