Skip to content

Commit

Permalink
Fix some lints
Browse files Browse the repository at this point in the history
  • Loading branch information
camdencheek committed Sep 8, 2020
1 parent 8c53231 commit e8ca3a8
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 40 deletions.
39 changes: 22 additions & 17 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ func (f *InputOperator) Start() error {
}

// Start polling goroutine
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.runPoller(ctx)
}()
f.startPoller(ctx)

return nil
}
Expand All @@ -73,19 +69,26 @@ func (f *InputOperator) Stop() error {
return nil
}

func (f *InputOperator) runPoller(ctx context.Context) {
globTicker := time.NewTicker(f.PollInterval)
defer globTicker.Stop()
// startPoller kicks off a goroutine that will poll the filesystem
// periodically, checking if there are new files or new logs in the
// watched files
func (f *InputOperator) startPoller(ctx context.Context) {
f.wg.Add(1)
go func() {
defer f.wg.Done()
globTicker := time.NewTicker(f.PollInterval)
defer globTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-globTicker.C:
}

for {
select {
case <-ctx.Done():
return
case <-globTicker.C:
f.poll(ctx)
}

f.poll(ctx)
}
}()
}

func (f *InputOperator) poll(ctx context.Context) {
Expand Down Expand Up @@ -201,7 +204,9 @@ func (f *InputOperator) syncKnownFiles() {
}

f.persist.Set(knownFilesKey, buf.Bytes())
f.persist.Sync()
if err := f.persist.Sync(); err != nil {
f.Errorw("Failed to sync to database", zap.Error(err))
}
}

func (f *InputOperator) loadKnownFiles() error {
Expand Down
26 changes: 6 additions & 20 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,37 +400,23 @@ func TestFileSource_CopyTruncateWriteBoth(t *testing.T) {
func TestFileSource_OffsetsAfterRestart(t *testing.T) {
t.Parallel()
// Create a new source
fakeOutput := testutil.NewFakeOutput(t)
tempDir := testutil.NewTempDir(t)
cfg := newDefaultConfig(tempDir)
buildContext := testutil.NewBuildContext(t)
pg, err := cfg.Build(buildContext)
require.NoError(t, err)
err = pg.SetOutputs([]operator.Operator{fakeOutput})
require.NoError(t, err)
source1 := pg.(*InputOperator)
source, logReceived, tempDir := newTestFileSource(t, nil)

temp1 := openTemp(t, tempDir)
writeString(t, temp1, "testlog1\n")

// Start the source and expect a message
require.NoError(t, source1.Start())
waitForMessage(t, fakeOutput.Received, "testlog1")
require.NoError(t, source.Start())
waitForMessage(t, logReceived, "testlog1")

// Restart the source. Stop and build a new
// one to guarantee freshness
require.NoError(t, source1.Stop())
pg, err = cfg.Build(buildContext)
require.NoError(t, err)
err = pg.SetOutputs([]operator.Operator{fakeOutput})
require.NoError(t, err)
source2 := pg.(*InputOperator)

require.NoError(t, source2.Start())
require.NoError(t, source.Stop())
require.NoError(t, source.Start())

// Write a new log and expect only that log
writeString(t, temp1, "testlog2\n")
waitForMessage(t, fakeOutput.Received, "testlog2")
waitForMessage(t, logReceived, "testlog2")
}

func TestFileSource_OffsetsAfterRestart_BigFiles(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ func (f *FileReader) ReadToEnd(ctx context.Context) {
break
}

// TODO if context is cancelled, we don't want to update offset
f.emit(ctx, scanner.Bytes())
if err := f.emit(ctx, scanner.Bytes()); err != nil {
f.Error("Failed to emit entry", zap.Error(err))
}
f.setOffset(scanner.Pos())
}

Expand All @@ -125,7 +126,9 @@ func (f *FileReader) ReadToEnd(ctx context.Context) {
f.Errorw("Failed reading trailing entry", zap.Error(err))
return
}
f.emit(ctx, msgBuf[:n])
if err := f.emit(ctx, msgBuf[:n]); err != nil {
f.Error("Failed to emit entry", zap.Error(err))
}
f.setOffset(scanner.Pos() + int64(n))
}
}
Expand Down

0 comments on commit e8ca3a8

Please sign in to comment.