diff --git a/pkg/logs/auditor/auditor.go b/pkg/logs/auditor/auditor.go index b72f3669d3dbb..8c9cfc179d4e8 100644 --- a/pkg/logs/auditor/auditor.go +++ b/pkg/logs/auditor/auditor.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "os" "path/filepath" "time" @@ -151,14 +150,15 @@ func (a *Auditor) flushRegistry(registry map[string]*RegistryEntry, path string) return ioutil.WriteFile(path, mr, 0644) } -// GetLastCommittedOffset returns the last committed offset for a given identifier -func (a *Auditor) GetLastCommittedOffset(identifier string) (int64, int) { +// GetLastCommittedOffset returns the last committed offset for a given identifier, +// returns 0 if it does not exist. +func (a *Auditor) GetLastCommittedOffset(identifier string) int64 { r := a.readOnlyRegistryCopy(a.registry) - entry, ok := r[identifier] - if !ok { - return 0, os.SEEK_END + entry, exists := r[identifier] + if !exists { + return 0 } - return entry.Offset, os.SEEK_CUR + return entry.Offset } // GetLastCommittedTimestamp returns the last committed offset for a given identifier diff --git a/pkg/logs/auditor/auditor_test.go b/pkg/logs/auditor/auditor_test.go index 8930a3213beed..7b6f216834af6 100644 --- a/pkg/logs/auditor/auditor_test.go +++ b/pkg/logs/auditor/auditor_test.go @@ -85,14 +85,12 @@ func (suite *AuditorTestSuite) TestAuditorRecoversRegistryForOffset() { Offset: 42, } - offset, whence := suite.a.GetLastCommittedOffset(suite.source.Config.Path) + offset := suite.a.GetLastCommittedOffset(suite.source.Config.Path) suite.Equal(int64(42), offset) - suite.Equal(os.SEEK_CUR, whence) othersource := config.NewLogSource("", &config.LogsConfig{Path: "anotherpath"}) - offset, whence = suite.a.GetLastCommittedOffset(othersource.Config.Path) + offset = suite.a.GetLastCommittedOffset(othersource.Config.Path) suite.Equal(int64(0), offset) - suite.Equal(os.SEEK_END, whence) } func (suite *AuditorTestSuite) TestAuditorRecoversRegistryForTimestamp() { diff --git a/pkg/logs/input/tailer/scanner.go b/pkg/logs/input/tailer/scanner.go index cf40fff2f3801..e4d804a818357 100644 --- a/pkg/logs/input/tailer/scanner.go +++ b/pkg/logs/input/tailer/scanner.go @@ -63,27 +63,37 @@ func (s *Scanner) setup() { if _, ok := s.tailers[file.Path]; ok { log.Warn("Can't tail file twice: ", file.Path) } else { - s.setupTailer(file, false, s.pp.NextPipelineChan()) + // resume tailing from last committed offset if exists or start tailing from the end of file otherwise + // to prevent from reading a file over and over again at agent restart + tailFromBeginning := false + s.startNewTailer(file, tailFromBeginning) } } } -// setupTailer sets one tailer, making it tail from the beginning or the end -// returns true if the setup succeeded, false otherwise -func (s *Scanner) setupTailer(file *File, tailFromBeginning bool, outputChan chan message.Message) bool { - t := NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration) +// createTailer returns a new initialized tailer +func (s *Scanner) createTailer(file *File, outputChan chan message.Message) *Tailer { + return NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration) +} + +// startNewTailer creates a new tailer, making it tail from the last committed offset, the beginning or the end of the file, +// returns true if the operation succeeded, false otherwise +func (s *Scanner) startNewTailer(file *File, tailFromBeginning bool) bool { + tailer := s.createTailer(file, s.pp.NextPipelineChan()) + offset := s.auditor.GetLastCommittedOffset(tailer.Identifier()) var err error - if tailFromBeginning { - err = t.tailFromBeginning() + if offset > 0 { + err = tailer.recoverTailing(offset) + } else if tailFromBeginning { + err = tailer.tailFromBeginning() } else { - // resume tailing from last committed offset - err = t.recoverTailing(s.auditor.GetLastCommittedOffset(t.Identifier())) + err = tailer.tailFromEnd() } if err != nil { log.Warn(err) return false } - s.tailers[file.Path] = t + s.tailers[file.Path] = tailer return true } @@ -145,8 +155,9 @@ func (s *Scanner) scan() { } if !isTailed && tailersLen < s.tailingLimit { - // create new tailer for file - succeeded := s.setupTailer(file, false, s.pp.NextPipelineChan()) + // create a new tailer tailing from the beginning of the file if no offset has been recorded + tailFromBeginning := true + succeeded := s.startNewTailer(file, tailFromBeginning) if !succeeded { // the setup failed, let's try to tail this file in the next scan continue @@ -161,8 +172,8 @@ func (s *Scanner) scan() { continue } if didRotate { - // update tailer because of file-rotation on file - succeeded := s.onFileRotation(tailer, file) + // restart tailer because of file-rotation on file + succeeded := s.restartTailerAfterFileRotation(tailer, file) if !succeeded { // the setup failed, let's try to tail this file in the next scan continue @@ -187,12 +198,20 @@ func (s *Scanner) didFileRotate(file *File, tailer *Tailer) (bool, error) { return tailer.checkForRotation() } -// onFileRotation safely stops tailer and setup a new one -// returns true if the setup succeeded, false otherwise -func (s *Scanner) onFileRotation(tailer *Tailer, file *File) bool { +// restartTailer safely stops tailer and starts a new one +// returns true if the new tailer is up and running, false if an error occurred +func (s *Scanner) restartTailerAfterFileRotation(tailer *Tailer, file *File) bool { log.Info("Log rotation happened to ", tailer.path) tailer.StopAfterFileRotation() - return s.setupTailer(file, true, tailer.outputChan) + tailer = s.createTailer(file, tailer.outputChan) + // force reading file from beginning since it has been log-rotated + err := tailer.tailFromBeginning() + if err != nil { + log.Warn(err) + return false + } + s.tailers[file.Path] = tailer + return true } // stopTailer stops the tailer diff --git a/pkg/logs/input/tailer/scanner_test.go b/pkg/logs/input/tailer/scanner_test.go index 8b32126c3df53..ece70853c0ba0 100644 --- a/pkg/logs/input/tailer/scanner_test.go +++ b/pkg/logs/input/tailer/scanner_test.go @@ -193,6 +193,48 @@ func TestScannerTestSuite(t *testing.T) { suite.Run(t, new(ScannerTestSuite)) } +func TestScannerScanStartNewTailer(t *testing.T) { + var err error + var path string + var file *os.File + var tailer *Tailer + var msg message.Message + + testDir, err := ioutil.TempDir("", "log-scanner-test-") + assert.Nil(t, err) + + // create scanner + path = fmt.Sprintf("%s/*.log", testDir) + sources := []*config.LogSource{config.NewLogSource("", &config.LogsConfig{Type: config.FileType, Path: path})} + openFilesLimit := 2 + sleepDuration := 20 * time.Millisecond + scanner := New(sources, openFilesLimit, mock.NewMockProvider(), auditor.New(nil, ""), sleepDuration) + + // setup scanner + scanner.setup() + assert.Equal(t, 0, len(scanner.tailers)) + + // create file + path = fmt.Sprintf("%s/test.log", testDir) + file, err = os.Create(path) + assert.Nil(t, err) + + // add content + _, err = file.WriteString("hello\n") + assert.Nil(t, err) + _, err = file.WriteString("world\n") + assert.Nil(t, err) + + // test scan from beginning + scanner.scan() + assert.Equal(t, 1, len(scanner.tailers)) + tailer = scanner.tailers[path] + msg = <-tailer.outputChan + assert.Equal(t, "hello", string(msg.Content())) + msg = <-tailer.outputChan + assert.Equal(t, "world", string(msg.Content())) +} + func TestScannerScanWithTooManyFiles(t *testing.T) { var err error var path string diff --git a/pkg/logs/input/tailer/tailer.go b/pkg/logs/input/tailer/tailer.go index aeefdb3a56179..1ab6bcda5c5d7 100644 --- a/pkg/logs/input/tailer/tailer.go +++ b/pkg/logs/input/tailer/tailer.go @@ -65,10 +65,21 @@ func (t *Tailer) Identifier() string { return fmt.Sprintf("file:%s", t.path) } -// recoverTailing starts the tailing from the last log line processed, or now -// if we tail this file for the first time -func (t *Tailer) recoverTailing(offset int64, whence int) error { - return t.tailFrom(offset, whence) +// tailFromBeginning lets the tailer start tailing its file +// from the beginning +func (t *Tailer) tailFromBeginning() error { + return t.tailFrom(0, os.SEEK_SET) +} + +// tailFromEnd lets the tailer start tailing its file +// from the end +func (t *Tailer) tailFromEnd() error { + return t.tailFrom(0, os.SEEK_END) +} + +// recoverTailingFrom starts the tailing from the last log line processed +func (t *Tailer) recoverTailing(offset int64) error { + return t.tailFrom(offset, os.SEEK_SET) } // Stop stops the tailer and returns only when the decoder is flushed @@ -119,12 +130,6 @@ func (t *Tailer) tailFrom(offset int64, whence int) error { return nil } -// tailFromBeginning lets the tailer start tailing its file -// from the beginning -func (t *Tailer) tailFromBeginning() error { - return t.tailFrom(0, os.SEEK_SET) -} - // forwardMessages lets the Tailer forward log messages to the output channel func (t *Tailer) forwardMessages() { defer func() { diff --git a/pkg/logs/input/tailer/tailer_test.go b/pkg/logs/input/tailer/tailer_test.go index 17a8c47f1c833..b60da69675093 100644 --- a/pkg/logs/input/tailer/tailer_test.go +++ b/pkg/logs/input/tailer/tailer_test.go @@ -90,6 +90,35 @@ func (suite *TailerTestSuite) TestTailFromBeginning() { suite.Equal(len(lines[0])+len(lines[1])+len(lines[2]), int(suite.tl.GetReadOffset())) } +func (suite *TailerTestSuite) TestTailFromEnd() { + lines := []string{"hello world\n", "hello again\n", "good bye\n"} + + var msg message.Message + var err error + + // this line should be tailed + _, err = suite.testFile.WriteString(lines[0]) + suite.Nil(err) + + suite.tl.tailFromEnd() + + // those lines should be tailed + _, err = suite.testFile.WriteString(lines[1]) + suite.Nil(err) + _, err = suite.testFile.WriteString(lines[2]) + suite.Nil(err) + + msg = <-suite.outputChan + suite.Equal("hello again", string(msg.Content())) + suite.Equal(len(lines[0])+len(lines[1]), int(msg.GetOrigin().Offset)) + + msg = <-suite.outputChan + suite.Equal("good bye", string(msg.Content())) + suite.Equal(len(lines[0])+len(lines[1])+len(lines[2]), int(msg.GetOrigin().Offset)) + + suite.Equal(len(lines[0])+len(lines[1])+len(lines[2]), int(suite.tl.GetReadOffset())) +} + func (suite *TailerTestSuite) TestRecoverTailing() { lines := []string{"hello world\n", "hello again\n", "good bye\n"} @@ -104,7 +133,7 @@ func (suite *TailerTestSuite) TestRecoverTailing() { _, err = suite.testFile.WriteString(lines[1]) suite.Nil(err) - suite.tl.recoverTailing(int64(len(lines[0])), os.SEEK_CUR) + suite.tl.recoverTailing(int64(len(lines[0]))) // this line should be tailed _, err = suite.testFile.WriteString(lines[2]) diff --git a/releasenotes/notes/logs-fix-lines-miss-issue-7bc7046f4e6b77bc.yaml b/releasenotes/notes/logs-fix-lines-miss-issue-7bc7046f4e6b77bc.yaml new file mode 100644 index 0000000000000..f74cf6e842291 --- /dev/null +++ b/releasenotes/notes/logs-fix-lines-miss-issue-7bc7046f4e6b77bc.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + Fix line miss [issue](https://github.com/DataDog/datadog-agent/issues/1302) that could happen when tailing new files found when scanning