diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index 3ab508745bc3..a81fd8f00a42 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -4,162 +4,19 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" import ( - "context" "encoding/binary" "encoding/json" - "errors" "fmt" - "math" - "runtime" "sort" "sync" "github.com/cespare/xxhash/v2" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) -// Converter converts a batch of entry.Entry into plog.Logs aggregating translated -// entries into logs coming from the same Resource. -// -// The diagram below illustrates the internal communication inside the Converter: -// -// ┌─────────────────────────────────┐ -// │ Batch() │ -// ┌─────────┤ Ingests batches of log entries │ -// │ │ and sends them onto workerChan │ -// │ └─────────────────────────────────┘ -// │ -// │ ┌───────────────────────────────────────────────────┐ -// ├─► workerLoop() │ -// │ │ ┌─────────────────────────────────────────────────┴─┐ -// ├─┼─► workerLoop() │ -// │ │ │ ┌─────────────────────────────────────────────────┴─┐ -// └─┼─┼─► workerLoop() │ -// └─┤ │ consumes sent log entries from workerChan, │ -// │ │ translates received entries to plog.LogRecords, │ -// └─┤ and sends them on flushChan │ -// └─────────────────────────┬─────────────────────────┘ -// │ -// ▼ -// ┌─────────────────────────────────────────────────────┐ -// │ flushLoop() │ -// │ receives log records from flushChan and sends │ -// │ them onto pLogsChan which is consumed by │ -// │ downstream consumers via OutChannel() │ -// └─────────────────────────────────────────────────────┘ -type Converter struct { - set component.TelemetrySettings - - // pLogsChan is a channel on which aggregated logs will be sent to. - pLogsChan chan plog.Logs - - stopOnce sync.Once - - // converterChan is an internal communication channel signaling stop was called - // prevents sending to closed channels - converterChan chan struct{} - - // workerChan is an internal communication channel that gets the log - // entries from Batch() calls and it receives the data in workerLoop(). - workerChan chan []*entry.Entry - // workerCount configures the amount of workers started. - workerCount int - - // flushChan is an internal channel used for transporting batched plog.Logs. - flushChan chan plog.Logs - - // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit - // when Stop() is called. - wg sync.WaitGroup - - // flushWg is a WaitGroup that makes sure that we wait for flush loop to exit - // when Stop() is called. - flushWg sync.WaitGroup -} - -type converterOption interface { - apply(*Converter) -} - -func withWorkerCount(workerCount int) converterOption { - return workerCountOption{workerCount} -} - -type workerCountOption struct { - workerCount int -} - -func (o workerCountOption) apply(c *Converter) { - c.workerCount = o.workerCount -} - -func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter { - set.Logger = set.Logger.With(zap.String("component", "converter")) - c := &Converter{ - set: set, - workerChan: make(chan []*entry.Entry), - workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), - pLogsChan: make(chan plog.Logs), - converterChan: make(chan struct{}), - flushChan: make(chan plog.Logs), - } - for _, opt := range opts { - opt.apply(c) - } - return c -} - -func (c *Converter) Start() { - c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount)) - - c.wg.Add(c.workerCount) - for i := 0; i < c.workerCount; i++ { - go c.workerLoop() - } - - c.flushWg.Add(1) - go c.flushLoop() -} - -func (c *Converter) Stop() { - c.stopOnce.Do(func() { - close(c.converterChan) - - // close workerChan and wait for entries to be processed - close(c.workerChan) - c.wg.Wait() - - // close flushChan and wait for flush loop to finish - close(c.flushChan) - c.flushWg.Wait() - - // close pLogsChan so callers can stop processing - close(c.pLogsChan) - }) -} - -// OutChannel returns the channel on which converted entries will be sent to. -func (c *Converter) OutChannel() <-chan plog.Logs { - return c.pLogsChan -} - -// workerLoop is responsible for obtaining log entries from Batch() calls, -// converting them to plog.LogRecords batched by Resource, and sending them -// on flushChan. -func (c *Converter) workerLoop() { - defer c.wg.Done() - - for entries := range c.workerChan { - // Send plogs directly to flushChan - c.flushChan <- ConvertEntries(entries) - } -} - func ConvertEntries(entries []*entry.Entry) plog.Logs { resourceHashToIdx := make(map[uint64]int) scopeIdxByResource := make(map[uint64]map[string]int) @@ -197,47 +54,6 @@ func ConvertEntries(entries []*entry.Entry) plog.Logs { return pLogs } -func (c *Converter) flushLoop() { - defer c.flushWg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - for pLogs := range c.flushChan { - if err := c.flush(ctx, pLogs); err != nil { - c.set.Logger.Debug("Problem sending log entries", - zap.Error(err), - ) - } - } -} - -// flush flushes provided plog.Logs entries onto a channel. -func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error { - doneChan := ctx.Done() - - select { - case <-doneChan: - return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err()) - - case c.pLogsChan <- pLogs: - } - - return nil -} - -// Batch takes in an entry.Entry and sends it to an available worker for processing. -func (c *Converter) Batch(e []*entry.Entry) error { - // in case Stop was called do not process batch - select { - case <-c.converterChan: - return errors.New("logs converter has been stopped") - default: - } - - c.workerChan <- e - return nil -} - // convert converts one entry.Entry into plog.LogRecord allocating it. func convert(ent *entry.Entry) plog.LogRecord { dest := plog.NewLogRecord() diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index a56a21f94eb2..a527e3bddb84 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -4,20 +4,15 @@ package adapter import ( - "context" "fmt" "sort" "strconv" - "sync" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) @@ -42,10 +37,6 @@ func BenchmarkConvertComplex(b *testing.B) { } } -func complexEntries(count int) []*entry.Entry { - return complexEntriesForNDifferentHosts(count, 1) -} - func complexEntriesForNDifferentHosts(count int, n int) []*entry.Entry { ret := make([]*entry.Entry, count) for i := 0; i < count; i++ { @@ -392,163 +383,25 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - assert.NoError(t, converter.Batch(entries)) - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - select { - case pLogs, ok := <-ch: - if !ok { - break - } + entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes) - rLogs := pLogs.ResourceLogs() - rLog := rLogs.At(0) + pLogs := ConvertEntries(entries) - ills := rLog.ScopeLogs() - require.Equal(t, ills.Len(), tc.numberOFScopes) + rLogs := pLogs.ResourceLogs() + rLog := rLogs.At(0) - for i := 0; i < tc.numberOFScopes; i++ { - sl := ills.At(i) - require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) - require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) - } + ills := rLog.ScopeLogs() + require.Equal(t, ills.Len(), tc.numberOFScopes) - case <-timeoutTimer.C: - break + for i := 0; i < tc.numberOFScopes; i++ { + sl := ills.At(i) + require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes)) + require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope) } }) } } -func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { - t.Parallel() - - testcases := []struct { - entries int - maxFlushCount uint - }{ - { - entries: 10, - maxFlushCount: 10, - }, - { - entries: 10, - maxFlushCount: 3, - }, - { - entries: 100, - maxFlushCount: 20, - }, - } - - for i, tc := range testcases { - tc := tc - - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - - go func() { - entries := complexEntries(tc.entries) - for from := 0; from < tc.entries; from += int(tc.maxFlushCount) { - to := from + int(tc.maxFlushCount) - if to > tc.entries { - to = tc.entries - } - assert.NoError(t, converter.Batch(entries[from:to])) - } - }() - - var ( - actualCount int - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - forLoop: - for { - if tc.entries == actualCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(t, 1, rLogs.Len()) - - rLog := rLogs.At(0) - ills := rLog.ScopeLogs() - require.Equal(t, 1, ills.Len()) - - sl := ills.At(0) - - actualCount += sl.LogRecords().Len() - - assert.LessOrEqual(t, uint(sl.LogRecords().Len()), tc.maxFlushCount, - "Received more log records in one flush than configured by maxFlushCount", - ) - - case <-timeoutTimer.C: - break forLoop - } - } - - assert.Equal(t, tc.entries, actualCount, - "didn't receive expected number of entries after conversion", - ) - }) - } -} - -func TestConverterCancelledContextCancellsTheFlush(t *testing.T) { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(t) - converter := NewConverter(set) - converter.Start() - defer converter.Stop() - var wg sync.WaitGroup - wg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - go func() { - defer wg.Done() - pLogs := plog.NewLogs() - ills := pLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() - - lr := convert(complexEntry()) - lr.CopyTo(ills.LogRecords().AppendEmpty()) - - assert.Error(t, converter.flush(ctx, pLogs)) - }() - wg.Wait() -} - func TestConvertMetadata(t *testing.T) { now := time.Now() @@ -946,55 +799,17 @@ func BenchmarkConverter(b *testing.B) { for _, wc := range workerCounts { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { for i := 0; i < b.N; i++ { - set := componenttest.NewNopTelemetrySettings() - set.Logger = zaptest.NewLogger(b) - converter := NewConverter(set, withWorkerCount(wc)) - converter.Start() - defer converter.Stop() - b.ReportAllocs() - go func() { - for from := 0; from < entryCount; from += int(batchSize) { - to := from + int(batchSize) - if to > entryCount { - to = entryCount - } - assert.NoError(b, converter.Batch(entries[from:to])) - } - }() - - var ( - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() - ) - defer timeoutTimer.Stop() - - var n int - forLoop: - for { - if n == entryCount { - break - } - - select { - case pLogs, ok := <-ch: - if !ok { - break forLoop - } - - rLogs := pLogs.ResourceLogs() - require.Equal(b, hostsCount, rLogs.Len()) - n += pLogs.LogRecordCount() - - case <-timeoutTimer.C: - break forLoop + for from := 0; from < entryCount; from += int(batchSize) { + to := from + int(batchSize) + if to > entryCount { + to = entryCount } + pLogs := ConvertEntries(entries[from:to]) + rLogs := pLogs.ResourceLogs() + require.Equal(b, hostsCount, rLogs.Len()) } - - assert.Equal(b, entryCount, n, - "didn't receive expected number of entries after conversion", - ) } }) } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 6d9cb8f7f577..a49a7d780286 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -12,7 +12,6 @@ import ( "path/filepath" "runtime" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -28,6 +27,7 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -83,44 +83,50 @@ func TestReadStaticFile(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), 3, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) + expectedLogs := []plog.Logs{} // Build the expected set by using adapter.Converter to translate entries // to pdata Logs. - queueEntry := func(t *testing.T, c *adapter.Converter, msg string, severity entry.Severity) { + entries := []*entry.Entry{} + queueEntry := func(msg string, severity entry.Severity) { e := entry.New() e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) + e.Body = fmt.Sprintf("2020-08-25 %s %s", severity.String(), msg) e.Severity = severity - e.AddAttribute("file_name", "simple.log") - require.NoError(t, c.Batch([]*entry.Entry{e})) + e.AddAttribute("log.file.name", "simple.log") + e.AddAttribute("time", "2020-08-25") + e.AddAttribute("sev", severity.String()) + e.AddAttribute("msg", msg) + entries = append(entries, e) } - queueEntry(t, converter, "Something routine", entry.Info) - queueEntry(t, converter, "Something bad happened!", entry.Error) - queueEntry(t, converter, "Some details...", entry.Debug) + queueEntry("Something routine", entry.Info) + queueEntry("Something bad happened!", entry.Error) + queueEntry("Some details...", entry.Debug) + + expectedLogs = append(expectedLogs, adapter.ConvertEntries(entries)) dir, err := os.Getwd() require.NoError(t, err) t.Logf("Working Directory: %s", dir) - wg.Wait() - require.Eventually(t, expectNLogs(sink, 3), 2*time.Second, 5*time.Millisecond, "expected %d but got %d logs", 3, sink.LogRecordCount(), ) - // TODO: Figure out a nice way to assert each logs entry content. - // require.Equal(t, expectedLogs, sink.AllLogs()) + + for i, expectedLog := range expectedLogs { + require.NoError(t, + plogtest.CompareLogs( + expectedLog, + sink.AllLogs()[i], + plogtest.IgnoreObservedTimestamp(), + plogtest.IgnoreTimestamp(), + ), + ) + } require.NoError(t, rcvr.Shutdown(context.Background())) } @@ -168,15 +174,6 @@ func (rt *rotationTest) Run(t *testing.T) { fileName := filepath.Join(tempDir, "test.log") backupFileName := filepath.Join(tempDir, "test-backup.log") - // Build expected outputs - expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - - var wg sync.WaitGroup - wg.Add(1) - go consumeNLogsFromConverter(converter.OutChannel(), numLogs, &wg) - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) @@ -224,40 +221,20 @@ func (rt *rotationTest) Run(t *testing.T) { msg := fmt.Sprintf("This is a simple log line with the number %3d", i) - // Build the expected set by converting entries to pdata Logs... - e := entry.New() - e.Timestamp = expectedTimestamp - require.NoError(t, e.Set(entry.NewBodyField("msg"), msg)) - require.NoError(t, converter.Batch([]*entry.Entry{e})) - // ... and write the logs lines to the actual file consumed by receiver. _, err := file.WriteString(fmt.Sprintf("2020-08-25 %s\n", msg)) require.NoError(t, err) time.Sleep(time.Millisecond) } - wg.Wait() require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, "expected %d but got %d logs", numLogs, sink.LogRecordCount(), ) + // TODO: Figure out a nice way to assert each logs entry content. // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) - converter.Stop() -} - -func consumeNLogsFromConverter(ch <-chan plog.Logs, count int, wg *sync.WaitGroup) { - defer wg.Done() - - n := 0 - for pLog := range ch { - n += pLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() - - if n == count { - return - } - } } func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool { diff --git a/receiver/filelogreceiver/go.mod b/receiver/filelogreceiver/go.mod index 0b83f6d48fb5..43e705cd1e03 100644 --- a/receiver/filelogreceiver/go.mod +++ b/receiver/filelogreceiver/go.mod @@ -19,6 +19,7 @@ require ( ) require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 go.opentelemetry.io/collector/pipeline v0.115.0 @@ -51,6 +52,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect diff --git a/receiver/namedpipereceiver/namedpipe_test.go b/receiver/namedpipereceiver/namedpipe_test.go index c7a4f25bea80..a13bc83e28c6 100644 --- a/receiver/namedpipereceiver/namedpipe_test.go +++ b/receiver/namedpipereceiver/namedpipe_test.go @@ -55,10 +55,6 @@ func TestReadPipe(t *testing.T) { sink := new(consumertest.LogsSink) cfg := testdataConfigYaml() - converter := adapter.NewConverter(componenttest.NewNopTelemetrySettings()) - converter.Start() - defer converter.Stop() - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) require.NoError(t, err, "failed to create receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))