diff --git a/internal/pkg/source/entry.go b/internal/pkg/source/entry.go index 3cdee61..82d6d51 100644 --- a/internal/pkg/source/entry.go +++ b/internal/pkg/source/entry.go @@ -22,12 +22,15 @@ type LazyLogEntry struct { length int } +// Length of the entry. func (e LazyLogEntry) Length() int { return e.length } +// Line re-reads the line. func (e LazyLogEntry) Line(file *os.File) (json.RawMessage, error) { data := make([]byte, e.length) + _, err := file.ReadAt(data, e.offset) if err != nil { return nil, err @@ -45,7 +48,7 @@ func (e LazyLogEntry) LogEntry(file *os.File, cfg *config.Config) LogEntry { } } - return ParseLogEntry(line, cfg) + return parseLogEntry(line, cfg) } // LogEntry is a single partly-parse record of the log. @@ -90,6 +93,7 @@ func (entries LazyLogEntries) Filter(term string) (LazyLogEntries, error) { if err != nil { return LazyLogEntries{}, err } + if bytes.Contains(bytes.ToLower(line), termLower) { filtered = append(filtered, f) } @@ -151,8 +155,6 @@ func formatField( return string(ParseLevel(formatMessage(value), cfg.CustomLevelMapping)) case config.FieldKindTime: return formatMessage(value) - case config.FieldKindNumericTime: - return formatMessage(value) case config.FieldKindSecondTime: return formatMessage(formatTimeString(value, "s")) case config.FieldKindMilliTime: @@ -166,8 +168,8 @@ func formatField( } } -// ParseLogEntry parses a single log entry from the json line. -func ParseLogEntry( +// parseLogEntry parses a single log entry from the json line. +func parseLogEntry( line json.RawMessage, cfg *config.Config, ) LogEntry { diff --git a/internal/pkg/source/entry_test.go b/internal/pkg/source/entry_test.go index b3a6401..654c70d 100644 --- a/internal/pkg/source/entry_test.go +++ b/internal/pkg/source/entry_test.go @@ -2,16 +2,20 @@ package source_test import ( "bytes" - "encoding/json" "fmt" + "math" + "os" + "strconv" "strings" "testing" "time" + "github.com/charmbracelet/bubbles/table" "github.com/stretchr/testify/require" "github.com/hedhyw/json-log-viewer/internal/pkg/config" "github.com/hedhyw/json-log-viewer/internal/pkg/source" + "github.com/hedhyw/json-log-viewer/internal/pkg/tests" "github.com/stretchr/testify/assert" ) @@ -29,7 +33,7 @@ func TestParseLogEntryDefault(t *testing.T) { Assert: func(tb testing.TB, fieldKindToValue map[config.FieldKind]string) { tb.Helper() - assert.Equal(t, "Hello World", fieldKindToValue[config.FieldKindMessage], fieldKindToValue) + assert.Equal(t, "Hello World\n", fieldKindToValue[config.FieldKindMessage], fieldKindToValue) assert.Equal(t, "-", fieldKindToValue[config.FieldKindLevel], fieldKindToValue) assert.Equal(t, "-", fieldKindToValue[config.FieldKindNumericTime], fieldKindToValue) }, @@ -177,6 +181,17 @@ func TestParseLogEntryDefault(t *testing.T) { fieldKindToValue, ) }, + }, { + Name: "special", + JSON: `{"msg":"\u0008"}`, + Assert: func(tb testing.TB, fieldKindToValue map[config.FieldKind]string) { + tb.Helper() + + assert.Empty(t, + fieldKindToValue[config.FieldKindMessage], + fieldKindToValue, + ) + }, }, { Name: "level", JSON: `{"level":"INFO"}`, @@ -197,9 +212,8 @@ func TestParseLogEntryDefault(t *testing.T) { cfg := config.GetDefaultConfig() - actual := source.ParseLogEntry(json.RawMessage(testCase.JSON), cfg) - - testCase.Assert(t, getFieldKindToValue(cfg, actual.Fields)) + actual := parseTableRow(t, testCase.JSON, cfg) + testCase.Assert(t, getFieldKindToValue(cfg, actual)) }) } } @@ -236,21 +250,21 @@ func TestLazyLogEntriesFilter(t *testing.T) { `, term) createEntries := func() (*source.Source, source.LazyLogEntries, source.LazyLogEntry) { - is, err := source.Reader(bytes.NewReader([]byte(logs)), config.GetDefaultConfig()) + source, err := source.Reader(bytes.NewReader([]byte(logs)), config.GetDefaultConfig()) require.NoError(t, err) - logEntries, err := is.ParseLogEntries() + logEntries, err := source.ParseLogEntries() require.NoError(t, err) logEntry := logEntries.Entries[1] - return is, logEntries, logEntry + return source, logEntries, logEntry } t.Run("all", func(t *testing.T) { t.Parallel() - is, logEntries, _ := createEntries() - defer is.Close() + source, logEntries, _ := createEntries() + defer source.Close() assert.Len(t, logEntries.Entries, logEntries.Len()) }) @@ -258,8 +272,8 @@ func TestLazyLogEntriesFilter(t *testing.T) { t.Run("found_exact", func(t *testing.T) { t.Parallel() - is, logEntries, logEntry := createEntries() - defer is.Close() + source, logEntries, logEntry := createEntries() + defer source.Close() filtered, err := logEntries.Filter(term) require.NoError(t, err) @@ -271,8 +285,9 @@ func TestLazyLogEntriesFilter(t *testing.T) { t.Run("found_ignore_case", func(t *testing.T) { t.Parallel() - is, logEntries, logEntry := createEntries() - defer is.Close() + + source, logEntries, logEntry := createEntries() + defer source.Close() filtered, err := logEntries.Filter(strings.ToUpper(term)) require.NoError(t, err) @@ -282,44 +297,46 @@ func TestLazyLogEntriesFilter(t *testing.T) { } }) + t.Run("empty", func(t *testing.T) { + t.Parallel() + + source, logEntries, _ := createEntries() + defer source.Close() + + filtered, err := logEntries.Filter("") + require.NoError(t, err) + assert.Len(t, filtered.Entries, logEntries.Len()) + }) + t.Run("not_found", func(t *testing.T) { t.Parallel() - is, logEntries, _ := createEntries() - defer is.Close() + + source, logEntries, _ := createEntries() + defer source.Close() filtered, err := logEntries.Filter(term + " - not found!") require.NoError(t, err) assert.Empty(t, filtered.Entries) }) -} -func getFieldKindToValue(cfg *config.Config, entries []string) map[config.FieldKind]string { - fieldKindToValue := make(map[config.FieldKind]string, len(entries)) + t.Run("seeker_failed", func(t *testing.T) { + t.Parallel() - for i, f := range cfg.Fields { - fieldKindToValue[f.Kind] = entries[i] - } + source, logEntries, _ := createEntries() + defer source.Close() - return fieldKindToValue -} + fileName := tests.RequireCreateFile(t, []byte("")) -type TimeFormattingTestCase struct { - TestName string - JSON string - ExpectedOutput string -} + f, err := os.Open(fileName) + require.NoError(t, err) + require.NoError(t, f.Close()) -func getTimestampFormattingConfig(fieldKind config.FieldKind) *config.Config { - return &config.Config{ - Path: config.PathDefault, - Fields: []config.Field{{ - Title: "Time", - Kind: fieldKind, - References: []string{"$.timestamp", "$.time", "$.t", "$.ts"}, - Width: 30, - }}, - } + logEntries.Seeker = f + + _, err = logEntries.Filter(term + " - not found!") + require.Error(t, err) + }) } func TestSecondTimeFormatting(t *testing.T) { @@ -345,13 +362,18 @@ func TestSecondTimeFormatting(t *testing.T) { TestName: "Seconds (int as string)", JSON: `{"timestamp":"1"}`, ExpectedOutput: expectedOutput, + }, { + TestName: "Seconds (int as string)", + JSON: `{"timestamp":"x"}`, + ExpectedOutput: `x`, }} for _, testCase := range secondsTestCases { t.Run(testCase.TestName, func(t *testing.T) { t.Parallel() - actual := source.ParseLogEntry(json.RawMessage(testCase.JSON), cfg) - assert.Equal(t, testCase.ExpectedOutput, actual.Fields[0]) + + actual := parseTableRow(t, testCase.JSON, cfg) + assert.Equal(t, testCase.ExpectedOutput, actual[0]) }) } } @@ -384,8 +406,9 @@ func TestMillisecondTimeFormatting(t *testing.T) { for _, testCase := range millisecondTestCases { t.Run(testCase.TestName, func(t *testing.T) { t.Parallel() - actual := source.ParseLogEntry(json.RawMessage(testCase.JSON), cfg) - assert.Equal(t, testCase.ExpectedOutput, actual.Fields[0]) + + actual := parseTableRow(t, testCase.JSON, cfg) + assert.Equal(t, testCase.ExpectedOutput, actual[0]) }) } } @@ -418,12 +441,31 @@ func TestMicrosecondTimeFormatting(t *testing.T) { for _, testCase := range microsecondTestCases { t.Run(testCase.TestName, func(t *testing.T) { t.Parallel() - actual := source.ParseLogEntry(json.RawMessage(testCase.JSON), cfg) - assert.Equal(t, testCase.ExpectedOutput, actual.Fields[0]) + + actual := parseTableRow(t, testCase.JSON, cfg) + assert.Equal(t, testCase.ExpectedOutput, actual[0]) }) } } +func TestFormattingUnknown(t *testing.T) { + t.Parallel() + + cfg := getTimestampFormattingConfig(config.FieldKind("unknown")) + + actual := parseTableRow(t, `{"timestamp": 1}`, cfg) + assert.Equal(t, "1", actual[0]) +} + +func TestFormattingAny(t *testing.T) { + t.Parallel() + + cfg := getTimestampFormattingConfig(config.FieldKindAny) + + actual := parseTableRow(t, `{"timestamp": 1}`, cfg) + assert.Equal(t, "1", actual[0]) +} + func TestNumericKindTimeFormatting(t *testing.T) { t.Parallel() @@ -469,13 +511,162 @@ func TestNumericKindTimeFormatting(t *testing.T) { TestName: "float with 14 digits before the decimal is microseconds", JSON: `{"timestamp":12345678900000.222}`, ExpectedOutput: time.Unix(12345678, 0).UTC().Format(time.RFC3339), + }, { + TestName: "max_int64", + JSON: fmt.Sprintf(`{"timestamp":"%d"}`, math.MaxInt64), + ExpectedOutput: strconv.Itoa(math.MaxInt64), + }, { + TestName: "negative", + JSON: `{"timestamp":"-1"}`, + ExpectedOutput: "-1", }} for _, testCase := range numericKindCases { t.Run(testCase.TestName, func(t *testing.T) { t.Parallel() - actual := source.ParseLogEntry(json.RawMessage(testCase.JSON), cfg) - assert.Equal(t, testCase.ExpectedOutput, actual.Fields[0]) + + actual := parseTableRow(t, testCase.JSON, cfg) + assert.Equal(t, testCase.ExpectedOutput, actual[0]) }) } } + +func TestLazyLogEntryLength(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + + logEntry := parseLazyLogEntry(t, entry, config.GetDefaultConfig()) + assert.Equal(t, len(entry), logEntry.Length()) +} + +func TestLazyLogEntryLine(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + + logEntry := parseLazyLogEntry(t, entry, config.GetDefaultConfig()) + assert.Equal(t, len(entry), logEntry.Length()) + + t.Run("success", func(t *testing.T) { + t.Parallel() + + fileName := tests.RequireCreateFile(t, []byte(entry)) + + f, err := os.Open(fileName) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, f.Close()) }) + + actual, err := logEntry.Line(f) + require.NoError(t, err) + + assert.Equal(t, entry, string(actual)) + }) + + t.Run("failed", func(t *testing.T) { + t.Parallel() + + fileName := tests.RequireCreateFile(t, []byte(entry)) + + f, err := os.Open(fileName) + require.NoError(t, err) + require.NoError(t, f.Close()) + + _, err = logEntry.Line(f) + require.Error(t, err) + }) +} + +func TestLazyLogEntryLogEntry(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + cfg := config.GetDefaultConfig() + + logEntry := parseLazyLogEntry(t, entry, config.GetDefaultConfig()) + assert.Equal(t, len(entry), logEntry.Length()) + + t.Run("success", func(t *testing.T) { + t.Parallel() + + fileName := tests.RequireCreateFile(t, []byte(entry)) + + f, err := os.Open(fileName) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, f.Close()) }) + + actual := logEntry.LogEntry(f, cfg) + require.NoError(t, actual.Error) + assert.Equal(t, entry, string(actual.Line)) + }) + + t.Run("failed", func(t *testing.T) { + t.Parallel() + + fileName := tests.RequireCreateFile(t, []byte(entry)) + + f, err := os.Open(fileName) + require.NoError(t, err) + require.NoError(t, f.Close()) + + actual := logEntry.LogEntry(f, cfg) + require.Error(t, actual.Error) + }) +} + +func parseLazyLogEntry(tb testing.TB, value string, cfg *config.Config) source.LazyLogEntry { + tb.Helper() + + source, err := source.Reader(strings.NewReader(value), cfg) + require.NoError(tb, err) + + logEntries, err := source.ParseLogEntries() + require.NoError(tb, err) + require.Equal(tb, 1, logEntries.Len()) + + return logEntries.Entries[0] +} + +func parseTableRow(tb testing.TB, value string, cfg *config.Config) table.Row { + tb.Helper() + + source, err := source.Reader(strings.NewReader(value+"\n"), cfg) + require.NoError(tb, err) + + logEntries, err := source.ParseLogEntries() + require.NoError(tb, err) + require.Equal(tb, 1, logEntries.Len(), value) + + return logEntries.Row(cfg, 0) +} + +func getFieldKindToValue(cfg *config.Config, entries []string) map[config.FieldKind]string { + fieldKindToValue := make(map[config.FieldKind]string, len(entries)) + + for i, f := range cfg.Fields { + fieldKindToValue[f.Kind] = entries[i] + } + + return fieldKindToValue +} + +type TimeFormattingTestCase struct { + TestName string + JSON string + ExpectedOutput string +} + +func getTimestampFormattingConfig(fieldKind config.FieldKind) *config.Config { + cfg := config.GetDefaultConfig() + + cfg.Fields = []config.Field{{ + Title: "Time", + Kind: fieldKind, + References: []string{"$.timestamp", "$.time", "$.t", "$.ts"}, + Width: 30, + }} + + return cfg +} diff --git a/internal/pkg/source/source.go b/internal/pkg/source/source.go index 8358642..2a45c06 100644 --- a/internal/pkg/source/source.go +++ b/internal/pkg/source/source.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "io" "os" @@ -14,6 +15,8 @@ import ( const ( maxLineSize = 8 * 1024 * 1024 + + temporaryFilePattern = "jvl-*.log" ) type Source struct { @@ -33,71 +36,74 @@ type Source struct { maxSize int64 } -func (is *Source) Close() (err error) { - err = is.file.Close() - e := is.Seeker.Close() - if e != nil { - err = e - } - return err +func (s *Source) Close() error { + return errors.Join(s.file.Close(), s.Seeker.Close()) } // File creates a new Source for reading log entries from a file. func File(name string, cfg *config.Config) (*Source, error) { var err error - is := &Source{ + + source := &Source{ maxSize: cfg.MaxFileSizeBytes, name: name, } - is.file, err = os.Open(name) + source.file, err = os.Open(name) if err != nil { - return nil, err + return nil, fmt.Errorf("opening: %w", err) } - is.Seeker, err = os.Open(name) + source.Seeker, err = os.Open(name) if err != nil { - _ = is.file.Close() - return nil, err + return nil, errors.Join(err, source.file.Close()) } - is.reader = bufio.NewReaderSize(io.LimitReader(is.file, is.maxSize), maxLineSize) - return is, nil + source.reader = bufio.NewReaderSize( + io.LimitReader(source.file, source.maxSize), + maxLineSize, + ) + + return source, nil } -// Reader creates a new Source for reading log entries from an io.Reader. This will write the input to a temp file. -// which will be used to seek against. +// Reader creates a new Source for reading log entries from an io.Reader. +// This will write the input to a temp file, which will be used to seek against. func Reader(input io.Reader, cfg *config.Config) (*Source, error) { var err error - is := &Source{ + + source := &Source{ maxSize: cfg.MaxFileSizeBytes, } // We will write the as read to a temp file. Seek against the temp file. - is.file, err = os.CreateTemp("", "jvl-*.log") + source.file, err = os.CreateTemp( + "", // Default directory for temporary files. + temporaryFilePattern, + ) if err != nil { - return nil, err + return nil, fmt.Errorf("creating temporary file: %w", err) } // The io.TeeReader will write the input to the is.file as it is read. - reader := io.TeeReader(input, is.file) + reader := io.TeeReader(input, source.file) // We can now seek against the data that is read in the input io.Reader. - is.Seeker, err = os.Open(is.file.Name()) + source.Seeker, err = os.Open(source.file.Name()) if err != nil { - _ = is.file.Close() - return nil, err + return nil, errors.Join(err, source.file.Close()) } - reader = io.LimitReader(reader, is.maxSize) - is.reader = bufio.NewReaderSize(reader, maxLineSize) - return is, nil + reader = io.LimitReader(reader, source.maxSize) + source.reader = bufio.NewReaderSize(reader, maxLineSize) + + return source, nil } -func (is *Source) ParseLogEntries() (LazyLogEntries, error) { - logEntries := make([]LazyLogEntry, 0, 1000) +func (s *Source) ParseLogEntries() (LazyLogEntries, error) { + logEntries := make([]LazyLogEntry, 0, initialLogSize) for { - entry, err := is.ReadLogEntry() + entry, err := s.readLogEntry() if err != nil { if errors.Is(err, io.EOF) { break @@ -105,58 +111,63 @@ func (is *Source) ParseLogEntries() (LazyLogEntries, error) { return LazyLogEntries{}, err } + logEntries = append(logEntries, entry) } return LazyLogEntries{ - Seeker: is.Seeker, + Seeker: s.Seeker, Entries: logEntries, }, nil } -func (is *Source) CanFollow() bool { - return len(is.name) != 0 +func (s *Source) CanFollow() bool { + return len(s.name) != 0 } const ErrFileTruncated semerr.Error = "file truncated" // ReadLogEntry reads the next ReadLogEntry from the file. -func (is *Source) ReadLogEntry() (LazyLogEntry, error) { +func (s *Source) readLogEntry() (LazyLogEntry, error) { for { - if is.reader == nil { + if s.reader == nil { // If we can't follow the file, or we have reached the max size, we are done. - if !is.CanFollow() || is.offset >= is.maxSize { + if !s.CanFollow() || s.offset >= s.maxSize { return LazyLogEntry{}, io.EOF } - // has the file size changed since we last looked? - info, err := os.Stat(is.name) - if err != nil || is.prevFollowSize == info.Size() { + // Has the file size changed since we last looked? + info, err := os.Stat(s.name) + if err != nil || s.prevFollowSize == info.Size() { return LazyLogEntry{}, io.EOF } - if info.Size() < is.offset { - // the file has been truncated or rolled over, all previous line offsets are invalid. - // we can't recover from this. + if info.Size() < s.offset { + // The file has been truncated or rolled over, all previous line + // offsets are invalid. We can't recover from this. return LazyLogEntry{}, ErrFileTruncated } - is.prevFollowSize = info.Size() - // reset the reader and try to read the file again. - _, _ = is.file.Seek(is.offset, io.SeekStart) - is.reader = bufio.NewReaderSize(io.LimitReader(is.file, is.maxSize-is.offset), maxLineSize) + + s.prevFollowSize = info.Size() + // Reset the reader and try to read the file again. + _, _ = s.file.Seek(s.offset, io.SeekStart) + s.reader = bufio.NewReaderSize(io.LimitReader(s.file, s.maxSize-s.offset), maxLineSize) } - line, err := is.reader.ReadSlice(byte('\n')) + line, err := s.reader.ReadBytes('\n') if err != nil { if errors.Is(err, io.EOF) { - // set the reader to nil so that we can recover from EOF. - is.reader = nil + // Set the reader to nil so that we can recover from EOF. + s.reader = nil } + return LazyLogEntry{}, err } + length := len(line) - offset := is.offset - is.offset += int64(length) + offset := s.offset + s.offset += int64(length) + if len(bytes.TrimSpace(line)) != 0 { return LazyLogEntry{ offset: offset, diff --git a/internal/pkg/source/source_test.go b/internal/pkg/source/source_test.go index 345968c..c21f859 100644 --- a/internal/pkg/source/source_test.go +++ b/internal/pkg/source/source_test.go @@ -4,26 +4,32 @@ import ( "bytes" "strings" "testing" + "testing/iotest" + "github.com/hedhyw/semerr/pkg/v1/semerr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hedhyw/json-log-viewer/assets" "github.com/hedhyw/json-log-viewer/internal/pkg/config" "github.com/hedhyw/json-log-viewer/internal/pkg/source" + "github.com/hedhyw/json-log-viewer/internal/pkg/tests" ) -func TestLoadLogsFromFile(t *testing.T) { +func TestParseLogEntries(t *testing.T) { t.Parallel() t.Run("ok", func(t *testing.T) { t.Parallel() reader := bytes.NewReader(assets.ExampleJSONLog()) - is, err := source.Reader(reader, config.GetDefaultConfig()) + + source, err := source.Reader(reader, config.GetDefaultConfig()) require.NoError(t, err) - defer is.Close() - logEntries, err := is.ParseLogEntries() + + t.Cleanup(func() { assert.NoError(t, source.Close()) }) + + logEntries, err := source.ParseLogEntries() require.NoError(t, err) assert.NotEmpty(t, logEntries) @@ -35,14 +41,31 @@ func TestLoadLogsFromFile(t *testing.T) { longLine := strings.Repeat("1", 2*1024*1024) reader := strings.NewReader(longLine) - is, err := source.Reader(reader, config.GetDefaultConfig()) + + source, err := source.Reader(reader, config.GetDefaultConfig()) require.NoError(t, err) - defer is.Close() - logEntries, err := is.ParseLogEntries() + + t.Cleanup(func() { assert.NoError(t, source.Close()) }) + + logEntries, err := source.ParseLogEntries() require.NoError(t, err) assert.NotEmpty(t, logEntries) }) + + t.Run("failed", func(t *testing.T) { + t.Parallel() + + reader := iotest.ErrReader(semerr.Error("test")) + + source, err := source.Reader(reader, config.GetDefaultConfig()) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, source.Close()) }) + + _, err = source.ParseLogEntries() + require.Error(t, err) + }) } func TestParseLogEntriesFromReaderLimited(t *testing.T) { @@ -62,3 +85,49 @@ func TestParseLogEntriesFromReaderLimited(t *testing.T) { require.Empty(t, logEntries.Entries) } + +func TestRow(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + + input := bytes.NewReader([]byte(entry)) + + cfg := config.GetDefaultConfig() + + source, err := source.Reader(input, cfg) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, source.Close()) }) + + lazyEntries, err := source.ParseLogEntries() + require.NoError(t, err) + + assert.Equal(t, 1, lazyEntries.Len()) + + row := lazyEntries.Row(cfg, 0) + assert.Contains(t, row, entry) +} + +func TestFile(t *testing.T) { + t.Parallel() + + cfg := config.GetDefaultConfig() + fileName := tests.RequireCreateFile(t, []byte(t.Name()+"\n")) + + t.Run("success", func(t *testing.T) { + t.Parallel() + + source, err := source.File(fileName, cfg) + require.NoError(t, err) + + assert.True(t, source.CanFollow()) + }) + + t.Run("not_found", func(t *testing.T) { + t.Parallel() + + _, err := source.File(fileName+"-not-found", cfg) + require.Error(t, err) + }) +} diff --git a/internal/pkg/source/sreamer.go b/internal/pkg/source/steamer.go similarity index 55% rename from internal/pkg/source/sreamer.go rename to internal/pkg/source/steamer.go index 69cb2c3..4c1ec0f 100644 --- a/internal/pkg/source/sreamer.go +++ b/internal/pkg/source/steamer.go @@ -8,19 +8,26 @@ import ( "time" ) -const InitialLogSize int = 1000 +const ( + // initialLogSize is a capacity of a slice with logs. + initialLogSize int = 1000 -func (is *Source) StartStreaming(ctx context.Context, send func(msg LazyLogEntries, err error)) { + // RefreshInterval is an interval of refreshing logs. + RefreshInterval = 200 * time.Millisecond +) + +// StartStreaming synchronizes log entries with the file and sends them to the channel. +func (s *Source) StartStreaming(ctx context.Context, send func(msg LazyLogEntries, err error)) { logEntriesLock := sync.Mutex{} - logEntries := make([]LazyLogEntry, 0, InitialLogSize) + logEntries := make([]LazyLogEntry, 0, initialLogSize) eofEvent := make(chan struct{}, 1) // Load log entries async.. - go is.readLogEntries(ctx, send, &logEntriesLock, &logEntries, eofEvent) + go s.readLogEntries(ctx, send, &logEntriesLock, &logEntries, eofEvent) // periodically send new log entries to the program. go func() { - ticker := time.NewTicker(200 * time.Millisecond) + ticker := time.NewTicker(RefreshInterval) lastLen := -1 defer ticker.Stop() @@ -28,14 +35,15 @@ func (is *Source) StartStreaming(ctx context.Context, send func(msg LazyLogEntri // Only send log update the program state every ticker seconds, // to avoid stressing the main loop. logEntriesLock.Lock() - entries := logEntries + logEntriesClone := make([]LazyLogEntry, len(logEntries)) + copy(logEntriesClone, logEntries) logEntriesLock.Unlock() - nextLen := len(entries) + nextLen := len(logEntriesClone) if lastLen != nextLen { send(LazyLogEntries{ - Seeker: is.Seeker, - Entries: entries, + Seeker: s.Seeker, + Entries: logEntriesClone, }, nil) lastLen = nextLen } @@ -44,7 +52,6 @@ func (is *Source) StartStreaming(ctx context.Context, send func(msg LazyLogEntri for { select { case <-ctx.Done(): - return case <-eofEvent: sendUpdates() @@ -57,7 +64,13 @@ func (is *Source) StartStreaming(ctx context.Context, send func(msg LazyLogEntri }() } -func (is *Source) readLogEntries(ctx context.Context, send func(msg LazyLogEntries, err error), logEntriesLock *sync.Mutex, logEntries *[]LazyLogEntry, eofEvent chan struct{}) { +func (s *Source) readLogEntries( + ctx context.Context, + send func(msg LazyLogEntries, err error), + logEntriesLock *sync.Mutex, + logEntries *[]LazyLogEntry, + eofEvent chan struct{}, +) { defer func() { eofEvent <- struct{}{} }() @@ -69,16 +82,16 @@ func (is *Source) readLogEntries(ctx context.Context, send func(msg LazyLogEntri default: } - entry, err := is.ReadLogEntry() + entry, err := s.readLogEntry() if err != nil { if errors.Is(err, io.EOF) { - if !is.CanFollow() { + if !s.CanFollow() { return } // wait for new log entries to be written to the file, // and try again. - ticker := time.NewTicker(200 * time.Millisecond) + ticker := time.NewTicker(RefreshInterval) select { case <-ctx.Done(): ticker.Stop() @@ -92,6 +105,7 @@ func (is *Source) readLogEntries(ctx context.Context, send func(msg LazyLogEntri send(LazyLogEntries{}, err) return } + logEntriesLock.Lock() *logEntries = append(*logEntries, entry) logEntriesLock.Unlock() diff --git a/internal/pkg/source/streamer_test.go b/internal/pkg/source/streamer_test.go new file mode 100644 index 0000000..9a840ff --- /dev/null +++ b/internal/pkg/source/streamer_test.go @@ -0,0 +1,195 @@ +package source_test + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/hedhyw/json-log-viewer/internal/pkg/config" + "github.com/hedhyw/json-log-viewer/internal/pkg/source" + "github.com/hedhyw/json-log-viewer/internal/pkg/tests" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const testDelay = source.RefreshInterval * 2 + +func TestStartStreamingEndOfFile(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + + input := bytes.NewReader([]byte(entry)) + + cfg := config.GetDefaultConfig() + + inputSource, err := source.Reader(input, cfg) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, inputSource.Close()) }) + + ctx, cancel := context.WithCancel(tests.Context(t)) + + entries := make(chan source.LazyLogEntries) + + inputSource.StartStreaming(ctx, func(msg source.LazyLogEntries, err error) { + require.NoError(t, err) + + select { + case entries <- msg: + case <-ctx.Done(): + } + }) + + select { + case msg := <-entries: + cancel() + + require.Equal(t, 1, msg.Len()) + assert.Contains(t, msg.Row(cfg, 0), entry) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +func TestStartStreamingUpdates(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + + pipeReader, pipeWriter := io.Pipe() + + t.Cleanup(func() { + assert.NoError(t, pipeReader.Close()) + assert.NoError(t, pipeWriter.Close()) + }) + + cfg := config.GetDefaultConfig() + + inputSource, err := source.Reader(pipeReader, cfg) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, inputSource.Close()) }) + + ctx := tests.Context(t) + + entries := make(chan source.LazyLogEntries) + + inputSource.StartStreaming(ctx, func(msg source.LazyLogEntries, _ error) { + if msg.Len() == 0 { + return + } + + select { + case entries <- msg: + case <-ctx.Done(): + } + }) + + for i := range 2 { + _, err = fmt.Fprintln(pipeWriter, entry) + require.NoError(t, err) + + select { + case msg := <-entries: + require.Equalf(t, i+1, msg.Len(), "iteration %d", i) + assert.Containsf(t, msg.Row(cfg, 0), entry, "iteration %d", i) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + + select { + case <-time.After(testDelay): + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + } +} + +func TestStartStreamingContextClosed(t *testing.T) { + t.Parallel() + + pipeReader, pipeWriter := io.Pipe() + + t.Cleanup(func() { + assert.NoError(t, pipeReader.Close()) + assert.NoError(t, pipeWriter.Close()) + }) + + cfg := config.GetDefaultConfig() + + inputSource, err := source.Reader(pipeReader, cfg) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, inputSource.Close()) }) + + ctx, cancel := context.WithCancel(tests.Context(t)) + defer cancel() + + inputSource.StartStreaming(ctx, func(source.LazyLogEntries, error) {}) + + cancel() + + <-time.After(2 * source.RefreshInterval) +} + +func TestStartStreamingFromFile(t *testing.T) { + t.Parallel() + + entry := t.Name() + "\n" + + fileName := tests.RequireCreateFile(t, []byte("")) + + cfg := config.GetDefaultConfig() + + inputSource, err := source.File(fileName, cfg) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, inputSource.Close()) }) + + ctx := tests.Context(t) + + entries := make(chan source.LazyLogEntries) + + inputSource.StartStreaming(ctx, func(msg source.LazyLogEntries, _ error) { + if msg.Len() == 0 { + return + } + + select { + case entries <- msg: + case <-ctx.Done(): + } + }) + + file, err := os.OpenFile(fileName, os.O_WRONLY, os.ModePerm) + require.NoError(t, err) + + t.Cleanup(func() { assert.NoError(t, file.Close()) }) + + for i := range 2 { + _, err = fmt.Fprintln(file, entry) + require.NoError(t, err) + + require.NoError(t, file.Sync()) + + select { + case msg := <-entries: + require.Equalf(t, i+1, msg.Len(), "iteration %d", i) + assert.Containsf(t, msg.Row(cfg, 0), entry, "iteration %d", i) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + + select { + case <-time.After(testDelay): + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + } +} diff --git a/internal/pkg/tests/tests.go b/internal/pkg/tests/tests.go index e538b90..84529f5 100644 --- a/internal/pkg/tests/tests.go +++ b/internal/pkg/tests/tests.go @@ -1,9 +1,11 @@ package tests import ( + "context" "encoding/json" "os" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,3 +43,20 @@ func RequireEncodeJSON(tb testing.TB, value any) []byte { return content } + +// Context returns a test context with timeout. +func Context(t *testing.T) context.Context { + t.Helper() + + const defaultTimeout = time.Minute + + deadline, ok := t.Deadline() + if !ok { + deadline = time.Now().Add(defaultTimeout) + } + + ctx, cancel := context.WithDeadline(context.Background(), deadline) + t.Cleanup(cancel) + + return ctx +}