From 8c53231f51e42ada7820e00825a4df3efc3f130e Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Tue, 8 Sep 2020 10:46:14 -0400 Subject: [PATCH] Update tests --- operator/builtin/input/file/config.go | 3 + operator/builtin/input/file/file.go | 41 +- operator/builtin/input/file/file_test.go | 617 +++++++++-------------- operator/builtin/input/file/reader.go | 3 +- 4 files changed, 256 insertions(+), 408 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 5b06a0a63..d7287aa36 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -124,6 +124,9 @@ func (c InputConfig) Build(context operator.BuildContext) (operator.Operator, er fingerprintBytes: 1000, startAtBeginning: startAtBeginning, encoding: encoding, + firstCheck: true, + cancel: func() {}, + knownFiles: make(map[string]*FileReader), MaxLogSize: c.MaxLogSize, } diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 290c2dd6f..b5cddafe0 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -37,15 +37,16 @@ type InputOperator struct { encoding encoding.Encoding - wg *sync.WaitGroup - cancel context.CancelFunc + wg sync.WaitGroup + firstCheck bool + cancel context.CancelFunc } // Start will start the file monitoring process func (f *InputOperator) Start() error { ctx, cancel := context.WithCancel(context.Background()) f.cancel = cancel - f.wg = &sync.WaitGroup{} + f.firstCheck = true // Load offsets from disk if err := f.loadKnownFiles(); err != nil { @@ -56,7 +57,7 @@ func (f *InputOperator) Start() error { f.wg.Add(1) go func() { defer f.wg.Done() - f.pollForNewFiles(ctx) + f.runPoller(ctx) }() return nil @@ -67,14 +68,15 @@ func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() f.syncKnownFiles() + f.knownFiles = nil + f.cancel = nil return nil } -func (f *InputOperator) pollForNewFiles(ctx context.Context) { +func (f *InputOperator) runPoller(ctx context.Context) { globTicker := time.NewTicker(f.PollInterval) defer globTicker.Stop() - firstCheck := true for { select { case <-ctx.Done(): @@ -82,18 +84,23 @@ func (f *InputOperator) pollForNewFiles(ctx context.Context) { case <-globTicker.C: } - f.syncKnownFiles() - // TODO clean unseen files from our list of known files. This grows unbound - // if the files rotate - matches := getMatches(f.Include, f.Exclude) - if firstCheck && len(matches) == 0 { - f.Warnw("no files match the configured include patterns", "include", f.Include) - } - for _, match := range matches { - f.checkPath(ctx, match, firstCheck) - } - firstCheck = false + f.poll(ctx) } +} + +func (f *InputOperator) poll(ctx context.Context) { + f.syncKnownFiles() + // TODO clean unseen files from our list of known files. This grows unbound + // if the files rotate + matches := getMatches(f.Include, f.Exclude) + if f.firstCheck && len(matches) == 0 { + f.Warnw("no files match the configured include patterns", "include", f.Include) + } + for _, match := range matches { + f.checkPath(ctx, match, f.firstCheck) + } + + f.firstCheck = false } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 74594358a..4d395f481 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -1,6 +1,7 @@ package file import ( + "context" "fmt" "io" "io/ioutil" @@ -15,39 +16,55 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/testutil" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func newTestFileSource(t *testing.T) (*InputOperator, chan *entry.Entry) { - mockOutput := testutil.NewMockOperator("output") - receivedEntries := make(chan *entry.Entry, 1000) - mockOutput.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { - receivedEntries <- args.Get(1).(*entry.Entry) - }) - +func newDefaultConfig(tempDir string) *InputConfig { cfg := NewInputConfig("testfile") cfg.PollInterval = operator.Duration{Duration: 50 * time.Millisecond} cfg.StartAt = "beginning" - cfg.Include = []string{"should-be-overwritten"} + cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} + cfg.OutputIDs = []string{"fake"} + return cfg +} + +func newTestFileSource(t *testing.T, cfgMod func(*InputConfig)) (*InputOperator, chan *entry.Entry, string) { + fakeOutput := testutil.NewFakeOutput(t) + tempDir := testutil.NewTempDir(t) + cfg := newDefaultConfig(tempDir) + if cfgMod != nil { + cfgMod(cfg) + } pg, err := cfg.Build(testutil.NewBuildContext(t)) if err != nil { t.Fatalf("Error building operator: %s", err) } - source := pg.(*InputOperator) - source.OutputOperators = []operator.Operator{mockOutput} + err = pg.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(t, err) - return source, receivedEntries + return pg.(*InputOperator), fakeOutput.Received, tempDir +} + +func openTemp(t testing.TB, tempDir string) *os.File { + file, err := ioutil.TempFile(tempDir, "") + require.NoError(t, err) + t.Cleanup(func() { _ = file.Close() }) + return file +} + +func writeString(t testing.TB, file *os.File, s string) { + _, err := file.WriteString(s) + require.NoError(t, err) } func TestFileSource_Build(t *testing.T) { t.Parallel() - mockOutput := testutil.NewMockOperator("mock") + fakeOutput := testutil.NewMockOperator("fake") basicConfig := func() *InputConfig { cfg := NewInputConfig("testfile") - cfg.OutputIDs = []string{"mock"} + cfg.OutputIDs = []string{"fake"} cfg.Include = []string{"/var/log/testpath.*"} cfg.Exclude = []string{"/var/log/testpath.ex*"} cfg.PollInterval = operator.Duration{Duration: 10 * time.Millisecond} @@ -65,7 +82,7 @@ func TestFileSource_Build(t *testing.T) { func(f *InputConfig) { return }, require.NoError, func(t *testing.T, f *InputOperator) { - require.Equal(t, f.OutputOperators[0], mockOutput) + require.Equal(t, f.OutputOperators[0], fakeOutput) require.Equal(t, f.Include, []string{"/var/log/testpath.*"}) require.Equal(t, f.FilePathField, entry.NewNilField()) require.Equal(t, f.FileNameField, entry.NewLabelField("file_name")) @@ -123,6 +140,7 @@ func TestFileSource_Build(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + t.Parallel() cfg := basicConfig() tc.modifyBaseConfig(cfg) @@ -132,7 +150,7 @@ func TestFileSource_Build(t *testing.T) { return } - err = plg.SetOutputs([]operator.Operator{mockOutput}) + err = plg.SetOutputs([]operator.Operator{fakeOutput}) require.NoError(t, err) fileInput := plg.(*InputOperator) @@ -147,215 +165,146 @@ func TestFileSource_CleanStop(t *testing.T) { See this issue for details: https://github.com/census-instrumentation/opencensus-go/issues/1191#issuecomment-610440163`) // defer goleak.VerifyNone(t) - source, _ := newTestFileSource(t) - - tempDir := testutil.NewTempDir(t) - - tempFile, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - - source.Include = []string{tempFile.Name()} - - err = source.Start() + source, _, tempDir := newTestFileSource(t, nil) + _ = openTemp(t, tempDir) + err := source.Start() require.NoError(t, err) source.Stop() } -func TestFileSource_AddFields(t *testing.T) { +// AddFields tests that the `file_name` and `file_path` fields are included +// when IncludeFileName and IncludeFilePath are set to true +func TestFileSource_AddFileFields(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - source.FilePathField = entry.NewLabelField("path") - source.FileNameField = entry.NewLabelField("file_name") + source, logReceived, tempDir := newTestFileSource(t, func(cfg *InputConfig) { + cfg.IncludeFileName = true + cfg.IncludeFilePath = true + }) // Create a file, then start - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() - - _, err = temp.WriteString("testlog\n") - require.NoError(t, err) + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog\n") - err = source.Start() - require.NoError(t, err) + require.NoError(t, source.Start()) defer source.Stop() - select { - case e := <-logReceived: - require.Equal(t, filepath.Base(temp.Name()), e.Labels["file_name"]) - require.Equal(t, temp.Name(), e.Labels["path"]) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message") - } + e := waitForOne(t, logReceived) + require.Equal(t, filepath.Base(temp.Name()), e.Labels["file_name"]) + require.Equal(t, temp.Name(), e.Labels["file_path"]) } +// ReadExistingLogs tests that, when starting from beginning, we +// read all the lines that are already there func TestFileSource_ReadExistingLogs(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - // Create a file, then start - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() - - _, err = temp.WriteString("testlog\n") - require.NoError(t, err) - - err = source.Start() - require.NoError(t, err) - defer source.Stop() - - waitForMessage(t, logReceived, "testlog") -} - -func TestFileSource_IncludeFile(t *testing.T) { - t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - source.FileNameField = entry.NewLabelField("file_name") - source.FilePathField = entry.NewLabelField("file_path") + source, logReceived, tempDir := newTestFileSource(t, nil) // Create a file, then start - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() - - _, err = temp.WriteString("testlog\n") - require.NoError(t, err) + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog1\ntestlog2\n") - err = source.Start() - require.NoError(t, err) + require.NoError(t, source.Start()) defer source.Stop() - select { - case e := <-logReceived: - require.Equal(t, e.Labels["file_name"], filepath.Base(temp.Name())) - require.Equal(t, e.Labels["file_path"], temp.Name()) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message") - } + waitForMessage(t, logReceived, "testlog1") + waitForMessage(t, logReceived, "testlog2") } +// ReadNewLogs tests that, after starting, if a new file is created +// all the entries in that file are read from the beginning func TestFileSource_ReadNewLogs(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} + source, logReceived, tempDir := newTestFileSource(t, nil) - // Start first, then create a new file - err := source.Start() - require.NoError(t, err) + // Poll once so we know this isn't a new file + source.poll(context.Background()) defer source.Stop() - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() + // Create a new file + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog\n") - _, err = temp.WriteString("testlog\n") - require.NoError(t, err) + // Poll a second time after the file has been created + source.poll(context.Background()) + // Expect the message to come through waitForMessage(t, logReceived, "testlog") } +// ReadExistingAndNewLogs tests that, on startup, if start_at +// is set to `beginning`, we read the logs that are there, and +// we read any additional logs that are written after startup func TestFileSource_ReadExistingAndNewLogs(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() + source, logReceived, tempDir := newTestFileSource(t, nil) - _, err = temp.WriteString("testlog1\n") - require.NoError(t, err) - - err = source.Start() - require.NoError(t, err) + // Start with a file with an entry in it, and expect that entry + // to come through when we poll for the first time + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog1\n") + source.poll(context.Background()) defer source.Stop() - - _, err = temp.WriteString("testlog2\n") - require.NoError(t, err) - waitForMessage(t, logReceived, "testlog1") + + // Write a second entry, and expect that entry to come through + // as well + writeString(t, temp, "testlog2\n") + source.poll(context.Background()) waitForMessage(t, logReceived, "testlog2") } +// StartAtEnd tests that when `start_at` is configured to `end`, +// we don't read any entries that were in the file before startup func TestFileSource_StartAtEnd(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - source.startAtBeginning = false - - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() + source, logReceived, tempDir := newTestFileSource(t, func(cfg *InputConfig) { + cfg.StartAt = "end" + }) - _, err = temp.WriteString("testlog1\n") - require.NoError(t, err) + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog1\n") - err = source.Start() - require.NoError(t, err) + // Expect no entries on the first poll + source.poll(context.Background()) defer source.Stop() + expectNoMessages(t, logReceived) - // Wait until file has been read the first time - time.Sleep(200 * time.Millisecond) - - _, err = temp.WriteString("testlog2\n") - require.NoError(t, err) - temp.Close() - + // Expect any new entries after the first poll + writeString(t, temp, "testlog2\n") + source.poll(context.Background()) waitForMessage(t, logReceived, "testlog2") } +// StartAtEndNewFile tests that when `start_at` is configured to `end`, +// a file created after the source has been started is read from the +// beginning func TestFileSource_StartAtEndNewFile(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} + source, logReceived, tempDir := newTestFileSource(t, nil) source.startAtBeginning = false - err := source.Start() - require.NoError(t, err) + source.poll(context.Background()) defer source.Stop() - // Wait for the first check to complete - time.Sleep(200 * time.Millisecond) - - temp, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp.Close() - - _, err = temp.WriteString("testlog1\ntestlog2\n") - require.NoError(t, err) + temp := openTemp(t, tempDir) + writeString(t, temp, "testlog1\ntestlog2\n") + source.poll(context.Background()) waitForMessage(t, logReceived, "testlog1") waitForMessage(t, logReceived, "testlog2") } func TestFileSource_MultiFileSimple(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} + source, logReceived, tempDir := newTestFileSource(t, nil) - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - temp2, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) + temp1 := openTemp(t, tempDir) + temp2 := openTemp(t, tempDir) - _, err = temp1.WriteString("testlog1\n") - require.NoError(t, err) - _, err = temp2.WriteString("testlog2\n") - require.NoError(t, err) + writeString(t, temp1, "testlog1\n") + writeString(t, temp2, "testlog2\n") - err = source.Start() - require.NoError(t, err) + require.NoError(t, source.Start()) defer source.Stop() waitForMessages(t, logReceived, []string{"testlog1", "testlog2"}) @@ -366,317 +315,200 @@ func TestFileSource_MoveFile(t *testing.T) { t.Skip("Moving files while open is unsupported on Windows") } t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) + source, logReceived, tempDir := newTestFileSource(t, nil) - _, err = temp1.WriteString("testlog1\n") - require.NoError(t, err) + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\n") temp1.Close() - err = source.Start() - require.NoError(t, err) + source.poll(context.Background()) defer source.Stop() waitForMessage(t, logReceived, "testlog1") - time.Sleep(200 * time.Millisecond) - - i := 0 - for { - err = os.Rename(temp1.Name(), fmt.Sprintf("%s.2", temp1.Name())) - if err != nil { - if i < 3 { - t.Error(err) - i++ - time.Sleep(10 * time.Millisecond) - continue - } else { - require.NoError(t, err) - } - } - break - } + // Wait until all goroutines are finished before renaming + source.wg.Wait() + err := os.Rename(temp1.Name(), fmt.Sprintf("%s.2", temp1.Name())) + require.NoError(t, err) + source.poll(context.Background()) expectNoMessages(t, logReceived) } +// TruncateThenWrite tests that, after a file has been truncated, +// any new writes are picked up func TestFileSource_TruncateThenWrite(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) + source, logReceived, tempDir := newTestFileSource(t, nil) - _, err = temp1.WriteString("testlog1\n") - require.NoError(t, err) - _, err = temp1.WriteString("testlog2\n") - require.NoError(t, err) + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\ntestlog2\n") - err = source.Start() - require.NoError(t, err) + source.poll(context.Background()) defer source.Stop() waitForMessage(t, logReceived, "testlog1") waitForMessage(t, logReceived, "testlog2") - err = temp1.Truncate(0) - require.NoError(t, err) + require.NoError(t, temp1.Truncate(0)) temp1.Seek(0, 0) - _, err = temp1.WriteString("testlog3\n") - require.NoError(t, err) - + writeString(t, temp1, "testlog3\n") + source.poll(context.Background()) waitForMessage(t, logReceived, "testlog3") expectNoMessages(t, logReceived) } +// CopyTruncateWriteBoth tests that when a file is copied +// with unread logs on the end, then the original is truncated, +// we get the unread logs on the copy as well as any new logs +// written to the truncated file func TestFileSource_CopyTruncateWriteBoth(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} + source, logReceived, tempDir := newTestFileSource(t, nil) - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp1.Close() - - _, err = temp1.WriteString("testlog1\n") - require.NoError(t, err) - _, err = temp1.WriteString("testlog2\n") - require.NoError(t, err) + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\ntestlog2\n") - err = source.Start() - require.NoError(t, err) + source.poll(context.Background()) defer source.Stop() waitForMessage(t, logReceived, "testlog1") waitForMessage(t, logReceived, "testlog2") + source.wg.Wait() // wait for all goroutines to finish - time.Sleep(50 * time.Millisecond) - - temp2, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - defer temp2.Close() - - _, err = io.Copy(temp1, temp2) + // Copy the first file to a new file, and add another log + temp2 := openTemp(t, tempDir) + _, err := io.Copy(temp2, temp1) require.NoError(t, err) // Truncate original file - err = temp1.Truncate(0) + require.NoError(t, temp1.Truncate(0)) temp1.Seek(0, 0) - require.NoError(t, err) // Write to original and new file - _, err = temp1.WriteString("testlog3\n") - require.NoError(t, err) - - waitForMessage(t, logReceived, "testlog3") + writeString(t, temp2, "testlog3\n") + writeString(t, temp1, "testlog4\n") - _, err = temp2.WriteString("testlog4\n") - require.NoError(t, err) - - waitForMessage(t, logReceived, "testlog4") + // Expect both messages to come through + source.poll(context.Background()) + waitForMessages(t, logReceived, []string{"testlog3", "testlog4"}) } +// OffsetsAfterRestart tests that a source is able to load +// its offsets after a restart func TestFileSource_OffsetsAfterRestart(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) + // Create a new source + fakeOutput := testutil.NewFakeOutput(t) tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - temp1, err := ioutil.TempFile(tempDir, "") + cfg := newDefaultConfig(tempDir) + buildContext := testutil.NewBuildContext(t) + pg, err := cfg.Build(buildContext) require.NoError(t, err) - - // Write to a file - _, err = temp1.WriteString("testlog1\n") + err = pg.SetOutputs([]operator.Operator{fakeOutput}) require.NoError(t, err) + source1 := pg.(*InputOperator) - // Start the source - err = source.Start() - require.NoError(t, err) - defer source.Stop() + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\n") - waitForMessage(t, logReceived, "testlog1") + // Start the source and expect a message + require.NoError(t, source1.Start()) + waitForMessage(t, fakeOutput.Received, "testlog1") - // Restart the source - err = source.Stop() + // Restart the source. Stop and build a new + // one to guarantee freshness + require.NoError(t, source1.Stop()) + pg, err = cfg.Build(buildContext) require.NoError(t, err) - err = source.Start() + err = pg.SetOutputs([]operator.Operator{fakeOutput}) require.NoError(t, err) + source2 := pg.(*InputOperator) - // Write a new log - _, err = temp1.WriteString("testlog2\n") - require.NoError(t, err) + require.NoError(t, source2.Start()) - waitForMessage(t, logReceived, "testlog2") + // Write a new log and expect only that log + writeString(t, temp1, "testlog2\n") + waitForMessage(t, fakeOutput.Received, "testlog2") } func TestFileSource_OffsetsAfterRestart_BigFiles(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} + source, logReceived, tempDir := newTestFileSource(t, nil) - log1 := stringWithLength(1000) - log2 := stringWithLength(1000) + log1 := stringWithLength(2000) + log2 := stringWithLength(2000) - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - - // Write to a file - _, err = temp1.WriteString(log1 + "\n") - require.NoError(t, err) + temp1 := openTemp(t, tempDir) + writeString(t, temp1, log1+"\n") // Start the source - err = source.Start() - require.NoError(t, err) - + require.NoError(t, source.Start()) + defer source.Stop() waitForMessage(t, logReceived, log1) // Restart the source - err = source.Stop() - require.NoError(t, err) - err = source.Start() - require.NoError(t, err) - defer source.Stop() - - _, err = temp1.WriteString(log2 + "\n") - require.NoError(t, err) + require.NoError(t, source.Stop()) + require.NoError(t, source.Start()) + writeString(t, temp1, log2+"\n") waitForMessage(t, logReceived, log2) } func TestFileSource_OffsetsAfterRestart_BigFilesWrittenWhileOff(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - log1 := stringWithLength(1000) - log2 := stringWithLength(1000) - - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) + source, logReceived, tempDir := newTestFileSource(t, nil) - // Write to a file - _, err = temp1.WriteString(log1 + "\n") - require.NoError(t, err) + log1 := stringWithLength(2000) + log2 := stringWithLength(2000) - // Start the source - err = source.Start() - require.NoError(t, err) + temp := openTemp(t, tempDir) + writeString(t, temp, log1+"\n") + // Start the source and expect the first message + require.NoError(t, source.Start()) + defer source.Stop() waitForMessage(t, logReceived, log1) - // Restart the source - err = source.Stop() - require.NoError(t, err) - - _, err = temp1.WriteString(log2 + "\n") - require.NoError(t, err) - - err = source.Start() - require.NoError(t, err) - defer source.Stop() + // Stop the source and write a new message + require.NoError(t, source.Stop()) + writeString(t, temp, log2+"\n") + // Start the source and expect the message + require.NoError(t, source.Start()) waitForMessage(t, logReceived, log2) } func TestFileSource_FileMovedWhileOff_BigFiles(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} + source, logReceived, tempDir := newTestFileSource(t, nil) log1 := stringWithLength(1000) log2 := stringWithLength(1000) - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - - // Write to a file - _, err = temp1.WriteString(log1 + "\n") - require.NoError(t, err) + temp := openTemp(t, tempDir) + writeString(t, temp, log1+"\n") // Start the source - err = source.Start() - require.NoError(t, err) - - waitForMessage(t, logReceived, log1) - - // Stop the source, then rename and write a new log - err = source.Stop() - require.NoError(t, err) - - _, err = temp1.WriteString(log2 + "\n") - require.NoError(t, err) - temp1.Close() - - err = os.Rename(temp1.Name(), fmt.Sprintf("%s2", temp1.Name())) - require.NoError(t, err) - - err = source.Start() - require.NoError(t, err) + require.NoError(t, source.Start()) defer source.Stop() - - waitForMessage(t, logReceived, log2) -} - -func TestFileSource_FileMovedWhileOff_SmallFiles(t *testing.T) { - t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - log1 := stringWithLength(10) - log2 := stringWithLength(10) - - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - - // Write to a file - _, err = temp1.WriteString(log1 + "\n") - require.NoError(t, err) - - // Start the source - err = source.Start() - require.NoError(t, err) - waitForMessage(t, logReceived, log1) - time.Sleep(50 * time.Millisecond) - - // Restart the source - err = source.Stop() - require.NoError(t, err) - - _, err = temp1.WriteString(log2 + "\n") - require.NoError(t, err) - temp1.Close() - err = os.Rename(temp1.Name(), fmt.Sprintf("%s2", temp1.Name())) - require.NoError(t, err) - - err = source.Start() + // Stop the source, then rename and write a new log + require.NoError(t, source.Stop()) + writeString(t, temp, log2+"\n") + err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) require.NoError(t, err) - defer source.Stop() + // Expect the message written to the new log to come through + require.NoError(t, source.Start()) waitForMessage(t, logReceived, log2) } func TestFileSource_ManyLogsDelivered(t *testing.T) { t.Parallel() - source, logReceived := newTestFileSource(t) - tempDir := testutil.NewTempDir(t) - source.Include = []string{fmt.Sprintf("%s/*", tempDir)} - - temp1, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) + source, logReceived, tempDir := newTestFileSource(t, nil) count := 1000 expectedMessages := make([]string, 0, count) @@ -685,20 +517,19 @@ func TestFileSource_ManyLogsDelivered(t *testing.T) { } // Start the source - err = source.Start() - require.NoError(t, err) + require.NoError(t, source.Start()) defer source.Stop() // Write lots of logs + temp := openTemp(t, tempDir) for _, message := range expectedMessages { - temp1.WriteString(message + "\n") + temp.WriteString(message + "\n") } - // Expect each of them to come through + // Expect each of them to come through once for _, message := range expectedMessages { waitForMessage(t, logReceived, message) } - expectNoMessages(t, logReceived) } @@ -711,6 +542,16 @@ func stringWithLength(length int) string { return string(b) } +func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { + select { + case e := <-c: + return e + case <-time.After(time.Minute): + require.FailNow(t, "Timed out waiting for message") + return nil + } +} + func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { select { case e := <-c: @@ -806,21 +647,17 @@ func TestEncodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - path := filepath.Join(tempDir, "in.log") - err := ioutil.WriteFile(path, tc.contents, 0777) + t.Parallel() + source, receivedEntries, tempDir := newTestFileSource(t, func(cfg *InputConfig) { + cfg.Encoding = tc.encoding + }) + + // Popualte the file + temp := openTemp(t, tempDir) + _, err := temp.Write(tc.contents) require.NoError(t, err) - source, receivedEntries := newTestFileSource(t) - source.Include = []string{path} - source.encoding, err = lookupEncoding(tc.encoding) - require.NoError(t, err) - source.SplitFunc, err = NewNewlineSplitFunc(source.encoding) - require.NoError(t, err) - require.NotNil(t, source.encoding) - - err = source.Start() - require.NoError(t, err) + require.NoError(t, source.Start()) defer source.Stop() for _, expected := range tc.expected { diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 46e1e3ef7..ec3a682bf 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -83,6 +83,7 @@ func (f *FileReader) ReadToEnd(ctx context.Context) { f.Errorw("Failed opening file", zap.Error(err)) return } + defer file.Close() lr := io.LimitReader(file, f.LastSeenFileSize-f.Offset) scanner := NewPositionalScanner(lr, f.fileInput.MaxLogSize, f.Offset, f.fileInput.SplitFunc) @@ -111,7 +112,7 @@ func (f *FileReader) ReadToEnd(ctx context.Context) { // If we're not at the end of the file, and we haven't // advanced since last cycle, read the rest of the file as an entry atFileEnd := scanner.Pos() == f.LastSeenFileSize - if !atFileEnd && fileSizeHasChanged { // TODO why did we have scanner.Pos() == f.offset in here? + if !atFileEnd && !fileSizeHasChanged { _, err := file.Seek(scanner.Pos(), 0) if err != nil { f.Errorw("Failed to seek for trailing entry", zap.Error(err))