Skip to content

Commit

Permalink
Fixed lines miss issue when tailing new files (#1315)
Browse files Browse the repository at this point in the history
* Fixed lines miss issue when tailing new files

* Refactored the logic for starting new tailers from file scanner
  • Loading branch information
ajacquemot authored Feb 21, 2018
1 parent a11ad67 commit fda87cb
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 40 deletions.
14 changes: 7 additions & 7 deletions pkg/logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

Expand Down Expand Up @@ -151,14 +150,15 @@ func (a *Auditor) flushRegistry(registry map[string]*RegistryEntry, path string)
return ioutil.WriteFile(path, mr, 0644)
}

// GetLastCommittedOffset returns the last committed offset for a given identifier
func (a *Auditor) GetLastCommittedOffset(identifier string) (int64, int) {
// GetLastCommittedOffset returns the last committed offset for a given identifier,
// returns 0 if it does not exist.
func (a *Auditor) GetLastCommittedOffset(identifier string) int64 {
r := a.readOnlyRegistryCopy(a.registry)
entry, ok := r[identifier]
if !ok {
return 0, os.SEEK_END
entry, exists := r[identifier]
if !exists {
return 0
}
return entry.Offset, os.SEEK_CUR
return entry.Offset
}

// GetLastCommittedTimestamp returns the last committed offset for a given identifier
Expand Down
6 changes: 2 additions & 4 deletions pkg/logs/auditor/auditor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,12 @@ func (suite *AuditorTestSuite) TestAuditorRecoversRegistryForOffset() {
Offset: 42,
}

offset, whence := suite.a.GetLastCommittedOffset(suite.source.Config.Path)
offset := suite.a.GetLastCommittedOffset(suite.source.Config.Path)
suite.Equal(int64(42), offset)
suite.Equal(os.SEEK_CUR, whence)

othersource := config.NewLogSource("", &config.LogsConfig{Path: "anotherpath"})
offset, whence = suite.a.GetLastCommittedOffset(othersource.Config.Path)
offset = suite.a.GetLastCommittedOffset(othersource.Config.Path)
suite.Equal(int64(0), offset)
suite.Equal(os.SEEK_END, whence)
}

func (suite *AuditorTestSuite) TestAuditorRecoversRegistryForTimestamp() {
Expand Down
55 changes: 37 additions & 18 deletions pkg/logs/input/tailer/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,37 @@ func (s *Scanner) setup() {
if _, ok := s.tailers[file.Path]; ok {
log.Warn("Can't tail file twice: ", file.Path)
} else {
s.setupTailer(file, false, s.pp.NextPipelineChan())
// resume tailing from last committed offset if exists or start tailing from the end of file otherwise
// to prevent from reading a file over and over again at agent restart
tailFromBeginning := false
s.startNewTailer(file, tailFromBeginning)
}
}
}

// setupTailer sets one tailer, making it tail from the beginning or the end
// returns true if the setup succeeded, false otherwise
func (s *Scanner) setupTailer(file *File, tailFromBeginning bool, outputChan chan message.Message) bool {
t := NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration)
// createTailer returns a new initialized tailer
func (s *Scanner) createTailer(file *File, outputChan chan message.Message) *Tailer {
return NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration)
}

// startNewTailer creates a new tailer, making it tail from the last committed offset, the beginning or the end of the file,
// returns true if the operation succeeded, false otherwise
func (s *Scanner) startNewTailer(file *File, tailFromBeginning bool) bool {
tailer := s.createTailer(file, s.pp.NextPipelineChan())
offset := s.auditor.GetLastCommittedOffset(tailer.Identifier())
var err error
if tailFromBeginning {
err = t.tailFromBeginning()
if offset > 0 {
err = tailer.recoverTailing(offset)
} else if tailFromBeginning {
err = tailer.tailFromBeginning()
} else {
// resume tailing from last committed offset
err = t.recoverTailing(s.auditor.GetLastCommittedOffset(t.Identifier()))
err = tailer.tailFromEnd()
}
if err != nil {
log.Warn(err)
return false
}
s.tailers[file.Path] = t
s.tailers[file.Path] = tailer
return true
}

Expand Down Expand Up @@ -145,8 +155,9 @@ func (s *Scanner) scan() {
}

if !isTailed && tailersLen < s.tailingLimit {
// create new tailer for file
succeeded := s.setupTailer(file, false, s.pp.NextPipelineChan())
// create a new tailer tailing from the beginning of the file if no offset has been recorded
tailFromBeginning := true
succeeded := s.startNewTailer(file, tailFromBeginning)
if !succeeded {
// the setup failed, let's try to tail this file in the next scan
continue
Expand All @@ -161,8 +172,8 @@ func (s *Scanner) scan() {
continue
}
if didRotate {
// update tailer because of file-rotation on file
succeeded := s.onFileRotation(tailer, file)
// restart tailer because of file-rotation on file
succeeded := s.restartTailerAfterFileRotation(tailer, file)
if !succeeded {
// the setup failed, let's try to tail this file in the next scan
continue
Expand All @@ -187,12 +198,20 @@ func (s *Scanner) didFileRotate(file *File, tailer *Tailer) (bool, error) {
return tailer.checkForRotation()
}

// onFileRotation safely stops tailer and setup a new one
// returns true if the setup succeeded, false otherwise
func (s *Scanner) onFileRotation(tailer *Tailer, file *File) bool {
// restartTailer safely stops tailer and starts a new one
// returns true if the new tailer is up and running, false if an error occurred
func (s *Scanner) restartTailerAfterFileRotation(tailer *Tailer, file *File) bool {
log.Info("Log rotation happened to ", tailer.path)
tailer.StopAfterFileRotation()
return s.setupTailer(file, true, tailer.outputChan)
tailer = s.createTailer(file, tailer.outputChan)
// force reading file from beginning since it has been log-rotated
err := tailer.tailFromBeginning()
if err != nil {
log.Warn(err)
return false
}
s.tailers[file.Path] = tailer
return true
}

// stopTailer stops the tailer
Expand Down
42 changes: 42 additions & 0 deletions pkg/logs/input/tailer/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,48 @@ func TestScannerTestSuite(t *testing.T) {
suite.Run(t, new(ScannerTestSuite))
}

func TestScannerScanStartNewTailer(t *testing.T) {
var err error
var path string
var file *os.File
var tailer *Tailer
var msg message.Message

testDir, err := ioutil.TempDir("", "log-scanner-test-")
assert.Nil(t, err)

// create scanner
path = fmt.Sprintf("%s/*.log", testDir)
sources := []*config.LogSource{config.NewLogSource("", &config.LogsConfig{Type: config.FileType, Path: path})}
openFilesLimit := 2
sleepDuration := 20 * time.Millisecond
scanner := New(sources, openFilesLimit, mock.NewMockProvider(), auditor.New(nil, ""), sleepDuration)

// setup scanner
scanner.setup()
assert.Equal(t, 0, len(scanner.tailers))

// create file
path = fmt.Sprintf("%s/test.log", testDir)
file, err = os.Create(path)
assert.Nil(t, err)

// add content
_, err = file.WriteString("hello\n")
assert.Nil(t, err)
_, err = file.WriteString("world\n")
assert.Nil(t, err)

// test scan from beginning
scanner.scan()
assert.Equal(t, 1, len(scanner.tailers))
tailer = scanner.tailers[path]
msg = <-tailer.outputChan
assert.Equal(t, "hello", string(msg.Content()))
msg = <-tailer.outputChan
assert.Equal(t, "world", string(msg.Content()))
}

func TestScannerScanWithTooManyFiles(t *testing.T) {
var err error
var path string
Expand Down
25 changes: 15 additions & 10 deletions pkg/logs/input/tailer/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,21 @@ func (t *Tailer) Identifier() string {
return fmt.Sprintf("file:%s", t.path)
}

// recoverTailing starts the tailing from the last log line processed, or now
// if we tail this file for the first time
func (t *Tailer) recoverTailing(offset int64, whence int) error {
return t.tailFrom(offset, whence)
// tailFromBeginning lets the tailer start tailing its file
// from the beginning
func (t *Tailer) tailFromBeginning() error {
return t.tailFrom(0, os.SEEK_SET)
}

// tailFromEnd lets the tailer start tailing its file
// from the end
func (t *Tailer) tailFromEnd() error {
return t.tailFrom(0, os.SEEK_END)
}

// recoverTailingFrom starts the tailing from the last log line processed
func (t *Tailer) recoverTailing(offset int64) error {
return t.tailFrom(offset, os.SEEK_SET)
}

// Stop stops the tailer and returns only when the decoder is flushed
Expand Down Expand Up @@ -119,12 +130,6 @@ func (t *Tailer) tailFrom(offset int64, whence int) error {
return nil
}

// tailFromBeginning lets the tailer start tailing its file
// from the beginning
func (t *Tailer) tailFromBeginning() error {
return t.tailFrom(0, os.SEEK_SET)
}

// forwardMessages lets the Tailer forward log messages to the output channel
func (t *Tailer) forwardMessages() {
defer func() {
Expand Down
31 changes: 30 additions & 1 deletion pkg/logs/input/tailer/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,35 @@ func (suite *TailerTestSuite) TestTailFromBeginning() {
suite.Equal(len(lines[0])+len(lines[1])+len(lines[2]), int(suite.tl.GetReadOffset()))
}

func (suite *TailerTestSuite) TestTailFromEnd() {
lines := []string{"hello world\n", "hello again\n", "good bye\n"}

var msg message.Message
var err error

// this line should be tailed
_, err = suite.testFile.WriteString(lines[0])
suite.Nil(err)

suite.tl.tailFromEnd()

// those lines should be tailed
_, err = suite.testFile.WriteString(lines[1])
suite.Nil(err)
_, err = suite.testFile.WriteString(lines[2])
suite.Nil(err)

msg = <-suite.outputChan
suite.Equal("hello again", string(msg.Content()))
suite.Equal(len(lines[0])+len(lines[1]), int(msg.GetOrigin().Offset))

msg = <-suite.outputChan
suite.Equal("good bye", string(msg.Content()))
suite.Equal(len(lines[0])+len(lines[1])+len(lines[2]), int(msg.GetOrigin().Offset))

suite.Equal(len(lines[0])+len(lines[1])+len(lines[2]), int(suite.tl.GetReadOffset()))
}

func (suite *TailerTestSuite) TestRecoverTailing() {
lines := []string{"hello world\n", "hello again\n", "good bye\n"}

Expand All @@ -104,7 +133,7 @@ func (suite *TailerTestSuite) TestRecoverTailing() {
_, err = suite.testFile.WriteString(lines[1])
suite.Nil(err)

suite.tl.recoverTailing(int64(len(lines[0])), os.SEEK_CUR)
suite.tl.recoverTailing(int64(len(lines[0])))

// this line should be tailed
_, err = suite.testFile.WriteString(lines[2])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
Fix line miss [issue](https://github.com/DataDog/datadog-agent/issues/1302) that could happen when tailing new files found when scanning

0 comments on commit fda87cb

Please sign in to comment.