From f24f54c5fd815b9f2ccc3e2161b8e28c3c2edccb Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 21 Oct 2024 10:58:10 -0700 Subject: [PATCH] [chore] Small nits in batch processor, use generics to avoid type conversions Signed-off-by: Bogdan Drutu --- processor/batchprocessor/batch_processor.go | 220 +++++++++--------- .../batchprocessor/batch_processor_test.go | 40 ++-- processor/batchprocessor/factory.go | 6 +- 3 files changed, 137 insertions(+), 129 deletions(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 190fcc109b4..2d715542dc7 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -37,7 +37,7 @@ var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher // Batches are sent out with any of the following conditions: // - batch size reaches cfg.SendBatchSize // - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. -type batchProcessor struct { +type batchProcessor[T any] struct { logger *zap.Logger timeout time.Duration sendBatchSize int @@ -45,7 +45,7 @@ type batchProcessor struct { // batchFunc is a factory for new batch objects corresponding // with the appropriate signal. - batchFunc func() batch + batchFunc func() batch[T] shutdownC chan struct{} goroutines sync.WaitGroup @@ -53,16 +53,16 @@ type batchProcessor struct { telemetry *batchProcessorTelemetry // batcher will be either *singletonBatcher or *multiBatcher - batcher batcher + batcher batcher[T] } // batcher is describes a *singletonBatcher or *multiBatcher. -type batcher interface { +type batcher[T any] interface { // start initializes background resources used by this batcher. start(ctx context.Context) error // consume incorporates a new item of data into the pending batch. - consume(ctx context.Context, data any) error + consume(ctx context.Context, data T) error // currentMetadataCardinality returns the number of shards. currentMetadataCardinality() int @@ -71,10 +71,10 @@ type batcher interface { // shard is a single instance of the batch logic. When metadata // keys are in use, one of these is created per distinct combination // of values. -type shard struct { +type shard[T any] struct { // processor refers to this processor, for access to common // configuration. - processor *batchProcessor + processor *batchProcessor[T] // exportCtx is a context with the metadata key-values // corresponding with this shard set. @@ -84,44 +84,40 @@ type shard struct { timer *time.Timer // newItem is used to receive data items from producers. - newItem chan any + newItem chan T // batch is an in-flight data item containing one of the // underlying data types. - batch batch + batch batch[T] } // batch is an interface generalizing the individual signal types. -type batch interface { +type batch[T any] interface { // export the current batch - export(ctx context.Context, req any) error + export(ctx context.Context, req T) error // splitBatch returns a full request built from pending items. - splitBatch(ctx context.Context, sendBatchMaxSize int) (sentBatchSize int, req any) + splitBatch(ctx context.Context, sendBatchMaxSize int) (sentBatchSize int, req T) // itemCount returns the size of the current batch itemCount() int // add item to the current batch - add(item any) + add(item T) // sizeBytes counts the OTLP encoding size of the batch - sizeBytes(item any) int + sizeBytes(item T) int } -var _ consumer.Traces = (*batchProcessor)(nil) -var _ consumer.Metrics = (*batchProcessor)(nil) -var _ consumer.Logs = (*batchProcessor)(nil) - // newBatchProcessor returns a new batch processor component. -func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() batch) (*batchProcessor, error) { +func newBatchProcessor[T any](set processor.Settings, cfg *Config, batchFunc func() batch[T]) (*batchProcessor[T], error) { // use lower-case, to be consistent with http/2 headers. mks := make([]string, len(cfg.MetadataKeys)) for i, k := range cfg.MetadataKeys { mks[i] = strings.ToLower(k) } sort.Strings(mks) - bp := &batchProcessor{ + bp := &batchProcessor[T]{ logger: set.Logger, sendBatchSize: int(cfg.SendBatchSize), @@ -131,11 +127,11 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat shutdownC: make(chan struct{}, 1), } if len(mks) == 0 { - bp.batcher = &singleShardBatcher{ + bp.batcher = &singleShardBatcher[T]{ processor: bp, } } else { - bp.batcher = &multiShardBatcher{ + bp.batcher = &multiShardBatcher[T]{ metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), processor: bp, @@ -152,30 +148,30 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat } // newShard gets or creates a batcher corresponding with attrs. -func (bp *batchProcessor) newShard(md map[string][]string) *shard { +func (bp *batchProcessor[T]) newShard(md map[string][]string) *shard[T] { exportCtx := client.NewContext(context.Background(), client.Info{ Metadata: client.NewMetadata(md), }) - b := &shard{ + b := &shard[T]{ processor: bp, - newItem: make(chan any, runtime.NumCPU()), + newItem: make(chan T, runtime.NumCPU()), exportCtx: exportCtx, batch: bp.batchFunc(), } return b } -func (bp *batchProcessor) Capabilities() consumer.Capabilities { +func (bp *batchProcessor[T]) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } // Start is invoked during service startup. -func (bp *batchProcessor) Start(ctx context.Context, _ component.Host) error { +func (bp *batchProcessor[T]) Start(ctx context.Context, _ component.Host) error { return bp.batcher.start(ctx) } // Shutdown is invoked during service shutdown. -func (bp *batchProcessor) Shutdown(context.Context) error { +func (bp *batchProcessor[T]) Shutdown(context.Context) error { close(bp.shutdownC) // Wait until all goroutines are done. @@ -183,12 +179,12 @@ func (bp *batchProcessor) Shutdown(context.Context) error { return nil } -func (b *shard) start() { +func (b *shard[T]) start() { b.processor.goroutines.Add(1) go b.startLoop() } -func (b *shard) startLoop() { +func (b *shard[T]) startLoop() { defer b.processor.goroutines.Done() // timerCh ensures we only block when there is a @@ -218,9 +214,6 @@ func (b *shard) startLoop() { } return case item := <-b.newItem: - if item == nil { - continue - } b.processItem(item) case <-timerCh: if b.batch.itemCount() > 0 { @@ -231,7 +224,7 @@ func (b *shard) startLoop() { } } -func (b *shard) processItem(item any) { +func (b *shard[T]) processItem(item T) { b.batch.add(item) sent := false for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) { @@ -245,23 +238,23 @@ func (b *shard) processItem(item any) { } } -func (b *shard) hasTimer() bool { +func (b *shard[T]) hasTimer() bool { return b.timer != nil } -func (b *shard) stopTimer() { +func (b *shard[T]) stopTimer() { if b.hasTimer() && !b.timer.Stop() { <-b.timer.C } } -func (b *shard) resetTimer() { +func (b *shard[T]) resetTimer() { if b.hasTimer() { b.timer.Reset(b.processor.timeout) } } -func (b *shard) sendItems(trigger trigger) { +func (b *shard[T]) sendItems(trigger trigger) { sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize) err := b.batch.export(b.exportCtx, req) @@ -278,28 +271,28 @@ func (b *shard) sendItems(trigger trigger) { // singleShardBatcher is used when metadataKeys is empty, to avoid the // additional lock and map operations used in multiBatcher. -type singleShardBatcher struct { - processor *batchProcessor - single *shard +type singleShardBatcher[T any] struct { + processor *batchProcessor[T] + single *shard[T] } -func (sb *singleShardBatcher) start(context.Context) error { +func (sb *singleShardBatcher[T]) start(context.Context) error { sb.single = sb.processor.newShard(nil) sb.single.start() return nil } -func (sb *singleShardBatcher) consume(_ context.Context, data any) error { +func (sb *singleShardBatcher[T]) consume(_ context.Context, data T) error { sb.single.newItem <- data return nil } -func (sb *singleShardBatcher) currentMetadataCardinality() int { +func (sb *singleShardBatcher[T]) currentMetadataCardinality() int { return 1 } -// multiBatcher is used when metadataKeys is not empty. -type multiShardBatcher struct { +// multiShardBatcher is used when metadataKeys is not empty. +type multiShardBatcher[T any] struct { // metadataKeys is the configured list of metadata keys. When // empty, the `singleton` batcher is used. When non-empty, // each distinct combination of metadata keys and values @@ -309,7 +302,7 @@ type multiShardBatcher struct { // metadataLimit is the limiting size of the batchers map. metadataLimit int - processor *batchProcessor + processor *batchProcessor[T] batchers sync.Map // Guards the size and the storing logic to ensure no more than limit items are stored. @@ -318,11 +311,11 @@ type multiShardBatcher struct { size int } -func (mb *multiShardBatcher) start(context.Context) error { +func (mb *multiShardBatcher[T]) start(context.Context) error { return nil } -func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { +func (mb *multiShardBatcher[T]) consume(ctx context.Context, data T) error { // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. info := client.FromContext(ctx) @@ -356,49 +349,72 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { b, loaded = mb.batchers.LoadOrStore(aset, mb.processor.newShard(md)) if !loaded { // Start the goroutine only if we added the object to the map, otherwise is already started. - b.(*shard).start() + b.(*shard[T]).start() mb.size++ } mb.lock.Unlock() } - b.(*shard).newItem <- data + b.(*shard[T]).newItem <- data return nil } -func (mb *multiShardBatcher) currentMetadataCardinality() int { +func (mb *multiShardBatcher[T]) currentMetadataCardinality() int { mb.lock.Lock() defer mb.lock.Unlock() return mb.size } -// ConsumeTraces implements processor.Traces -func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - return bp.batcher.consume(ctx, td) +type tracesBatchProcessor struct { + *batchProcessor[ptrace.Traces] } -// ConsumeMetrics implements processor.Metrics -func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - return bp.batcher.consume(ctx, md) +// newTracesBatchProcessor creates a new batch processor that batches traces by size or with timeout +func newTracesBatchProcessor(set processor.Settings, next consumer.Traces, cfg *Config) (processor.Traces, error) { + bp, err := newBatchProcessor(set, cfg, func() batch[ptrace.Traces] { return newBatchTraces(next) }) + if err != nil { + return nil, err + } + return &tracesBatchProcessor{batchProcessor: bp}, nil } -// ConsumeLogs implements processor.Logs -func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - return bp.batcher.consume(ctx, ld) +func (t *tracesBatchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + return t.batcher.consume(ctx, td) +} + +type metricsBatchProcessor struct { + *batchProcessor[pmetric.Metrics] +} + +// newMetricsBatchProcessor creates a new batch processor that batches metrics by size or with timeout +func newMetricsBatchProcessor(set processor.Settings, next consumer.Metrics, cfg *Config) (processor.Metrics, error) { + bp, err := newBatchProcessor(set, cfg, func() batch[pmetric.Metrics] { return newMetricsBatch(next) }) + if err != nil { + return nil, err + } + return &metricsBatchProcessor{batchProcessor: bp}, nil +} + +// ConsumeMetrics implements processor.Metrics +func (m *metricsBatchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + return m.batcher.consume(ctx, md) } -// newBatchTraces creates a new batch processor that batches traces by size or with timeout -func newBatchTracesProcessor(set processor.Settings, next consumer.Traces, cfg *Config) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) }) +type logsBatchProcessor struct { + *batchProcessor[plog.Logs] } -// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout -func newBatchMetricsProcessor(set processor.Settings, next consumer.Metrics, cfg *Config) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, func() batch { return newBatchMetrics(next) }) +// newLogsBatchProcessor creates a new batch processor that batches logs by size or with timeout +func newLogsBatchProcessor(set processor.Settings, next consumer.Logs, cfg *Config) (processor.Logs, error) { + bp, err := newBatchProcessor(set, cfg, func() batch[plog.Logs] { return newBatchLogs(next) }) + if err != nil { + return nil, err + } + return &logsBatchProcessor{batchProcessor: bp}, nil } -// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout -func newBatchLogsProcessor(set processor.Settings, next consumer.Logs, cfg *Config) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) }) +// ConsumeLogs implements processor.Logs +func (l *logsBatchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return l.batcher.consume(ctx, ld) } type batchTraces struct { @@ -413,8 +429,7 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { } // add updates current batchTraces by adding new TraceData object -func (bt *batchTraces) add(item any) { - td := item.(ptrace.Traces) +func (bt *batchTraces) add(td ptrace.Traces) { newSpanCount := td.SpanCount() if newSpanCount == 0 { return @@ -424,29 +439,28 @@ func (bt *batchTraces) add(item any) { td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } -func (bt *batchTraces) sizeBytes(data any) int { - return bt.sizer.TracesSize(data.(ptrace.Traces)) +func (bt *batchTraces) sizeBytes(td ptrace.Traces) int { + return bt.sizer.TracesSize(td) } -func (bt *batchTraces) export(ctx context.Context, req any) error { - td := req.(ptrace.Traces) +func (bt *batchTraces) export(ctx context.Context, td ptrace.Traces) error { return bt.nextConsumer.ConsumeTraces(ctx, td) } -func (bt *batchTraces) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) { - var req ptrace.Traces +func (bt *batchTraces) splitBatch(_ context.Context, sendBatchMaxSize int) (int, ptrace.Traces) { + var td ptrace.Traces var sent int if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize { - req = splitTraces(sendBatchMaxSize, bt.traceData) + td = splitTraces(sendBatchMaxSize, bt.traceData) bt.spanCount -= sendBatchMaxSize sent = sendBatchMaxSize } else { - req = bt.traceData + td = bt.traceData sent = bt.spanCount bt.traceData = ptrace.NewTraces() bt.spanCount = 0 } - return sent, req + return sent, td } func (bt *batchTraces) itemCount() int { @@ -460,43 +474,40 @@ type batchMetrics struct { sizer pmetric.Sizer } -func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { +func newMetricsBatch(nextConsumer consumer.Metrics) *batchMetrics { return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}} } -func (bm *batchMetrics) sizeBytes(data any) int { - return bm.sizer.MetricsSize(data.(pmetric.Metrics)) +func (bm *batchMetrics) sizeBytes(md pmetric.Metrics) int { + return bm.sizer.MetricsSize(md) } -func (bm *batchMetrics) export(ctx context.Context, req any) error { - md := req.(pmetric.Metrics) +func (bm *batchMetrics) export(ctx context.Context, md pmetric.Metrics) error { return bm.nextConsumer.ConsumeMetrics(ctx, md) } -func (bm *batchMetrics) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) { - var req pmetric.Metrics +func (bm *batchMetrics) splitBatch(_ context.Context, sendBatchMaxSize int) (int, pmetric.Metrics) { + var md pmetric.Metrics var sent int if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { - req = splitMetrics(sendBatchMaxSize, bm.metricData) + md = splitMetrics(sendBatchMaxSize, bm.metricData) bm.dataPointCount -= sendBatchMaxSize sent = sendBatchMaxSize } else { - req = bm.metricData + md = bm.metricData sent = bm.dataPointCount bm.metricData = pmetric.NewMetrics() bm.dataPointCount = 0 } - return sent, req + return sent, md } func (bm *batchMetrics) itemCount() int { return bm.dataPointCount } -func (bm *batchMetrics) add(item any) { - md := item.(pmetric.Metrics) - +func (bm *batchMetrics) add(md pmetric.Metrics) { newDataPointCount := md.DataPointCount() if newDataPointCount == 0 { return @@ -516,39 +527,36 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}} } -func (bl *batchLogs) sizeBytes(data any) int { - return bl.sizer.LogsSize(data.(plog.Logs)) +func (bl *batchLogs) sizeBytes(ld plog.Logs) int { + return bl.sizer.LogsSize(ld) } -func (bl *batchLogs) export(ctx context.Context, req any) error { - ld := req.(plog.Logs) +func (bl *batchLogs) export(ctx context.Context, ld plog.Logs) error { return bl.nextConsumer.ConsumeLogs(ctx, ld) } -func (bl *batchLogs) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) { - var req plog.Logs +func (bl *batchLogs) splitBatch(_ context.Context, sendBatchMaxSize int) (int, plog.Logs) { + var ld plog.Logs var sent int if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize { - req = splitLogs(sendBatchMaxSize, bl.logData) + ld = splitLogs(sendBatchMaxSize, bl.logData) bl.logCount -= sendBatchMaxSize sent = sendBatchMaxSize } else { - req = bl.logData + ld = bl.logData sent = bl.logCount bl.logData = plog.NewLogs() bl.logCount = 0 } - return sent, req + return sent, ld } func (bl *batchLogs) itemCount() int { return bl.logCount } -func (bl *batchLogs) add(item any) { - ld := item.(plog.Logs) - +func (bl *batchLogs) add(ld plog.Logs) { newLogsCount := ld.LogRecordCount() if newLogsCount == 0 { return diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 3d2e6571406..e49badeca3d 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -86,7 +86,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { cfg.SendBatchSize = 128 creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -129,7 +129,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg.SendBatchMaxSize = 130 creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -183,7 +183,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { cfg.Timeout = 500 * time.Millisecond creationSet := tel.NewSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -304,7 +304,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { cfg.Timeout = 500 * time.Millisecond creationSet := tel.NewSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -442,7 +442,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -489,7 +489,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -520,7 +520,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) + batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -576,7 +576,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { creationSet := tel.NewSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) + batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -686,7 +686,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { dataPointsPerMetric := 2 sendBatchMaxSize := 99 - batchMetrics := newBatchMetrics(sink) + batchMetrics := newMetricsBatch(sink) md := testdata.GenerateMetrics(metricsCount) batchMetrics.add(md) @@ -710,7 +710,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) + batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -759,7 +759,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) + batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -858,7 +858,7 @@ func runMetricsProcessorBenchmark(b *testing.B, cfg Config) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed metricsPerRequest := 1000 - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) + batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) require.NoError(b, err) require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost())) @@ -905,7 +905,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) + batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -959,7 +959,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { creationSet := tel.NewSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) + batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1073,7 +1073,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) + batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1122,7 +1122,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) + batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1201,7 +1201,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { cfg.MetadataKeys = []string{"token1", "token2"} creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1293,7 +1293,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit creationSet := processortest.NewNopSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) + batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1336,7 +1336,7 @@ func TestBatchZeroConfig(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) + batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }() @@ -1377,7 +1377,7 @@ func TestBatchSplitOnly(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := processortest.NewNopSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) + batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }() diff --git a/processor/batchprocessor/factory.go b/processor/batchprocessor/factory.go index 12fcbb9e6ab..2cb52a12c39 100644 --- a/processor/batchprocessor/factory.go +++ b/processor/batchprocessor/factory.go @@ -49,7 +49,7 @@ func createTraces( cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { - return newBatchTracesProcessor(set, nextConsumer, cfg.(*Config)) + return newTracesBatchProcessor(set, nextConsumer, cfg.(*Config)) } func createMetrics( @@ -58,7 +58,7 @@ func createMetrics( cfg component.Config, nextConsumer consumer.Metrics, ) (processor.Metrics, error) { - return newBatchMetricsProcessor(set, nextConsumer, cfg.(*Config)) + return newMetricsBatchProcessor(set, nextConsumer, cfg.(*Config)) } func createLogs( @@ -67,5 +67,5 @@ func createLogs( cfg component.Config, nextConsumer consumer.Logs, ) (processor.Logs, error) { - return newBatchLogsProcessor(set, nextConsumer, cfg.(*Config)) + return newLogsBatchProcessor(set, nextConsumer, cfg.(*Config)) }