Skip to content

Commit

Permalink
retry copy if copy receives EOF but still running
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Jun 11, 2020
1 parent a7291fc commit efacb1e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 14 deletions.
29 changes: 29 additions & 0 deletions client/logmon/logmon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logmon

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -177,6 +178,8 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
retryCtx context.Context
retryCancel context.CancelFunc
fifoPath string
rotatorWriter io.WriteCloser
hasFinishedCopied chan struct{}
Expand Down Expand Up @@ -217,8 +220,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos
logger.Error("failed to create FIFO", "stat_error", serr, "create_err", err)
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())

wrap := &logRotatorWrapper{
retryCtx: ctx,
retryCancel: cancel,
fifoPath: path,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}),
Expand All @@ -235,6 +241,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos
func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
go func() {
defer close(l.hasFinishedCopied)
RETRY_COPY:
if l.retryCtx.Err() != nil {
l.logger.Warn("fifo context cancelled, exiting")
return
}

reader, err := openFn()
if err != nil {
Expand All @@ -256,13 +267,31 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
// force-killed.
reader.Close()
}
// Close reader
reader.Close()

// Check if the context has been closed
// return if it has, otherwise wait before
// restarting copy
select {
case <-l.retryCtx.Done():
return
case <-time.After(1 * time.Second):
}

// reset openCompleted
l.openCompleted = make(chan struct{})
goto RETRY_COPY
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() {
// Cancel the retryable context
l.retryCancel()

// Wait up to the close tolerance before we force close
select {
case <-l.hasFinishedCopied:
Expand Down
84 changes: 70 additions & 14 deletions client/logmon/logmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,10 @@ func TestLogmon_Start_restart_flusheslogs(t *testing.T) {
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the file
// Close stdout and stderr
require.NoError(stdout.Close())
require.NoError(stderr.Close())

testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

stdout, err = fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err = fifo.OpenWriter(stderrFifoPath)
Expand Down Expand Up @@ -234,16 +228,10 @@ func TestLogmon_Start_restart(t *testing.T) {
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the file
// Close stdout and assert that logmon no longer writes to the fileA
require.NoError(stdout.Close())
require.NoError(stderr.Close())

testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

// Start logmon again and assert that it can receive logs again
require.NoError(lm.Start(cfg))

Expand All @@ -267,6 +255,74 @@ func TestLogmon_Start_restart(t *testing.T) {
})
}

// asserts that start goroutine properly exists when ctx is cancelled
func TestLogmon_Start_restart_Close(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
}

lm := NewLogMon(testlog.HCLogger(t))
impl, ok := lm.(*logmonImpl)
require.True(ok)
require.NoError(lm.Start(cfg))

stdout, err := fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err := fifo.OpenWriter(stderrFifoPath)
require.NoError(err)

// Write a string and assert it was written to the file
_, err = stdout.Write([]byte("test\n"))
require.NoError(err)

testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}
return "test\n" == string(raw), fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the fileA
require.NoError(stdout.Close())
require.NoError(stderr.Close())

// Close the task logger
impl.tl.Close()

// Ensure that the task logger is no longer running
testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

}

// panicWriter panics on use
type panicWriter struct{}

Expand Down

0 comments on commit efacb1e

Please sign in to comment.