From 8106ad8bf291e9c1ae99120405d1cdd03305f97f Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 20 Dec 2023 04:31:02 -0800 Subject: [PATCH] [exporterhelper] Introduce batching functionality This change introduces new experimental batching functionality to the exporter helper --- .chloggen/batch-exporter-helper.yaml | 25 ++ exporter/exporterhelper/batch_sender.go | 336 ++++++++++++++ exporter/exporterhelper/batch_sender_test.go | 433 +++++++++++++++++++ exporter/exporterhelper/common.go | 27 +- exporter/exporterhelper/queue_sender.go | 6 +- exporter/exporterhelper/request_test.go | 106 ++++- 6 files changed, 922 insertions(+), 11 deletions(-) create mode 100755 .chloggen/batch-exporter-helper.yaml create mode 100644 exporter/exporterhelper/batch_sender.go create mode 100644 exporter/exporterhelper/batch_sender_test.go diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml new file mode 100755 index 00000000000..613cfbac5b2 --- /dev/null +++ b/.chloggen/batch-exporter-helper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add experimental batching capabilities to the exporter helper + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..f3b362b1a5c --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,336 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of +// items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeBatcherConfig struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // Timeout sets the time after which a batch will be sent regardless of its size. + // When this is set to zero, data will be sent immediately. + // This is a recommended option, as it will ensure that the data is sent in a timely manner. + Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout? + + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +func (c MergeBatcherConfig) Validate() error { + if c.MinSizeItems < 0 { + return errors.New("min_size_items must be greater than or equal to zero") + } + if c.Timeout <= 0 { + return errors.New("timeout must be greater than zero") + } + return nil +} + +func NewDefaultMergeBatcherConfig() MergeBatcherConfig { + return MergeBatcherConfig{ + Enabled: true, + Timeout: 200 * time.Millisecond, + MinSizeItems: 8192, + } +} + +// SplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and +// minimum and maximum number of items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type SplitBatcherConfig struct { + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP, + // but can be anything else for other formats. If the batch size exceeds this value, + // it will be broken up into smaller batches if possible. + // Setting this value to zero disables the maximum size limit. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +func (c SplitBatcherConfig) Validate() error { + if c.MaxSizeItems < 0 { + return errors.New("max_size_items must be greater than or equal to zero") + } + return nil +} + +// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and +// minimum and maximum number of items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeSplitBatcherConfig struct { + MergeBatcherConfig `mapstructure:",squash"` + SplitBatcherConfig `mapstructure:",squash"` +} + +func (c MergeSplitBatcherConfig) Validate() error { + if c.MaxSizeItems < c.MinSizeItems { + return errors.New("max_size_items must be greater than or equal to min_size_items") + } + return nil +} + +// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchConfigBatchersLimit struct { + // BatchersLimit is the maximum number of batchers that can be used for batching. + // Requests producing batch identifiers that exceed this limit will be dropped. + // If this value is zero, then there is no limit on the number of batchers. + BatchersLimit int `mapstructure:"batchers_limit"` +} + +// BatchMergeFunc is a function that merges two requests into a single request. +// Context will be propagated from the first request. +// Do not mutate the requests passed to the function if error can be returned after mutation. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeFunc func(context.Context, Request, Request) (Request, error) + +// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided +// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the +// maximum number of items. Size of the last returned request MUST be less or equal than the size of any other returned +// request. The original request MUST not be mutated if error is returned. The length of the returned slice MUST not +// be 0. The optionalReq argument can be nil, make sure to check it before using. maxItems argument is guaranteed to be +// greater than 0. Context will be propagated from the original request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error) + +type BatcherOption func(*batchSender) + +func WithSplitBatcher(cfg SplitBatcherConfig, msf BatchMergeSplitFunc) BatcherOption { + return func(b *batchSender) { + if cfg.MaxSizeItems != 0 { + b.splitCfg = cfg + b.mergeSplitFunc = msf + } + } +} + +// batchSender is a component that accepts places requests into batches before passing them to the downstream senders. +// +// batch_processor implements consumer.Traces and consumer.Metrics +// +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +type batchSender struct { + baseRequestSender + mergeCfg MergeBatcherConfig + splitCfg SplitBatcherConfig + mergeFunc BatchMergeFunc + mergeSplitFunc BatchMergeSplitFunc + + // concurrencyLimit is the maximum number of goroutines that can be created by the batcher. + // If this number is reached and all the goroutines are busy, the batch will be sent right away. + // Populated from the number of queue consumers if queue is enabled. + concurrencyLimit uint64 + activeRequests atomic.Uint64 + + resetTimerCh chan struct{} + + mu sync.Mutex + activeBatch *batch + + logger *zap.Logger + + shutdownCh chan struct{} + stopped *atomic.Bool +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(cfg MergeBatcherConfig, set exporter.CreateSettings, mf BatchMergeFunc, opts ...BatcherOption) requestSender { + bs := &batchSender{ + activeBatch: newEmptyBatch(), + mergeCfg: cfg, + mergeFunc: mf, + logger: set.Logger, + shutdownCh: make(chan struct{}), + stopped: &atomic.Bool{}, + resetTimerCh: make(chan struct{}), + } + + for _, op := range opts { + op(bs) + } + return bs +} + +func (bs *batchSender) Start(_ context.Context, _ component.Host) error { + timer := time.NewTimer(bs.mergeCfg.Timeout) + go func() { + for { + select { + case <-bs.shutdownCh: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + timer.Reset(bs.mergeCfg.Timeout) + case <-bs.resetTimerCh: + if !timer.Stop() { + <-timer.C + } + timer.Reset(bs.mergeCfg.Timeout) + } + } + }() + + return nil +} + +type batch struct { + ctx context.Context + request Request + done chan struct{} + err error +} + +func newEmptyBatch() *batch { + return &batch{ + ctx: context.Background(), + done: make(chan struct{}), + } +} + +// exportActiveBatch exports the active batch asynchronously and replaces it with a new one. +// Caller must hold the lock. +func (bs *batchSender) exportActiveBatch() { + go func(b *batch) { + b.err = b.request.Export(b.ctx) + close(b.done) + }(bs.activeBatch) + bs.activeBatch = newEmptyBatch() +} + +// isActiveBatchReady returns true if the active batch is ready to be exported. +// The batch is ready if it has reached the minimum size or the concurrency limit is reached. +// Caller must hold the lock. +func (bs *batchSender) isActiveBatchReady() bool { + return bs.activeBatch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems || + (bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit) +} + +func (bs *batchSender) send(ctx context.Context, req Request) error { + // Stopped batch sender should act as pass-through to allow the queue to be drained. + if bs.stopped.Load() { + return bs.nextSender.send(ctx, req) + } + + bs.activeRequests.Add(1) + defer bs.activeRequests.Add(^uint64(0)) + + if bs.mergeSplitFunc != nil { + return bs.sendMergeSplitBatch(ctx, req) + } + return bs.sendMergeBatch(ctx, req) +} + +// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. +func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + + reqs, err := bs.mergeSplitFunc(ctx, bs.activeBatch.request, req, bs.splitCfg.MaxSizeItems) + if err != nil || len(reqs) == 0 { + bs.mu.Unlock() + return err + } + if len(reqs) == 1 || bs.activeBatch.request != nil { + bs.updateActiveBatch(ctx, reqs[0]) + batch := bs.activeBatch + if bs.isActiveBatchReady() || len(reqs) > 1 { + bs.exportActiveBatch() + bs.resetTimerCh <- struct{}{} + } + bs.mu.Unlock() + <-batch.done + if batch.err != nil { + return batch.err + } + reqs = reqs[1:] + } else { + bs.mu.Unlock() + } + + // Intentionally do not put the last request in the active batch to not block it. + // TODO: Consider including the partial request in the error to avoid double publishing. + for _, r := range reqs { + if err := r.Export(ctx); err != nil { + return err + } + } + return nil +} + +// sendMergeBatch sends the request to the batch and waits for the batch to be exported. +func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + if bs.activeBatch.request != nil { + var err error + req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req) + if err != nil { + bs.mu.Unlock() + return err + } + } + bs.updateActiveBatch(ctx, req) + batch := bs.activeBatch + if bs.isActiveBatchReady() { + bs.exportActiveBatch() + bs.resetTimerCh <- struct{}{} + } + bs.mu.Unlock() + <-batch.done + return batch.err +} + +// updateActiveBatch update the active batch to the new merged request and context. +// The context is only set once and is not updated after the first call. +// Merging the context would be complex and require an additional goroutine to handle the context cancellation. +// We take the approach of using the context from the first request since it's likely to have the shortest timeout. +func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) { + if bs.activeBatch.request == nil { + bs.activeBatch.ctx = ctx + } + bs.activeBatch.request = req +} + +func (bs *batchSender) Shutdown(context.Context) error { + bs.stopped.Store(true) + close(bs.shutdownCh) + // Wait for the active requests to finish. + for bs.activeRequests.Load() > 0 { + time.Sleep(10 * time.Millisecond) + } + return nil +} diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go new file mode 100644 index 00000000000..c8d9616128a --- /dev/null +++ b/exporter/exporterhelper/batch_sender_test.go @@ -0,0 +1,433 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterqueue" +) + +func TestBatchSender_MergeBatcherConfig_Validate(t *testing.T) { + cfg := NewDefaultMergeBatcherConfig() + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") + + cfg = NewDefaultMergeBatcherConfig() + cfg.Timeout = 0 + assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero") +} + +func TestBatchSender_SplitBatcherConfig_Validate(t *testing.T) { + cfg := SplitBatcherConfig{} + assert.NoError(t, cfg.Validate()) + + cfg.MaxSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") +} + +func TestBatchSender_MergeSplitBatcherConfig_Validate(t *testing.T) { + cfg := MergeSplitBatcherConfig{ + MergeBatcherConfig: NewDefaultMergeBatcherConfig(), + SplitBatcherConfig: SplitBatcherConfig{ + MaxSizeItems: 20000, + }, + } + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = 20001 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") +} + +func TestBatchSender_Merge(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + mergeCfg.Timeout = 100 * time.Millisecond + + tests := []struct { + name string + batcherOption Option + }{ + { + name: "merge_only", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc), + }, + { + name: "split_disabled", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 0}, fakeBatchMergeSplitFunc)), + }, + { + name: "split_high_limit", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 1000}, fakeBatchMergeSplitFunc)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // the first two requests should be merged into one and sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + + // the third and fifth requests should be sent by reaching the timeout + // the fourth request should be ignored because of the merge error. + time.Sleep(50 * time.Millisecond) + + // should be ignored because of the merge error. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, + mergeErr: errors.New("merge error")})) + + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_BatchExportError(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + tests := []struct { + name string + batcherOption Option + expectedRequests uint64 + expectedItems uint64 + }{ + { + name: "merge_only", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc), + }, + { + name: "merge_with_split_triggered", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 200}, fakeBatchMergeSplitFunc)), + }, + { + name: "merge_with_split_triggered", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 20}, fakeBatchMergeSplitFunc)), + expectedRequests: 1, + expectedItems: 20, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + + // the first two requests should be blocked by the batchSender. + time.Sleep(50 * time.Millisecond) + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + + // the third request should trigger the export and cause an error. + errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} + require.NoError(t, be.send(context.Background(), errReq)) + + // the batch should be dropped since the queue doesn't have requeuing enabled. + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == tt.expectedRequests && + sink.itemsCount.Load() == tt.expectedItems && + be.batchSender.(*batchSender).activeRequests.Load() == uint64(0) && + be.queueSender.(*queueSender).queue.Size() == 0 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_MergeOrSplit(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 5 + mergeCfg.Timeout = 100 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 10}, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, + mergeErr: errors.New("split error")})) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 + }, 50*time.Millisecond, 10*time.Millisecond) + + fmt.Println("TestBatchSender_MergeOrSplit") +} + +func TestBatchSender_Shutdown(t *testing.T) { + batchCfg := NewDefaultMergeBatcherConfig() + batchCfg.MinSizeItems = 10 + be := queueBatchExporter(t, WithBatcher(batchCfg, fakeBatchMergeFunc)) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // To make the request reached the batchSender before shutdown. + time.Sleep(50 * time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) + + // shutdown should force sending the batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func TestBatchSender_Disabled(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Enabled = false + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, + WithBatcher(mergeCfg, fakeBatchMergeFunc, WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 10}, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent right away because batching is disabled. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { + invalidMergeSplitFunc := func(_ context.Context, _ Request, req2 Request, _ int) ([]Request, error) { + // reply with invalid 0 length slice if req2 is more than 20 items + if req2.(*fakeRequest).items > 20 { + return []Request{}, nil + } + // otherwise reply with a single request. + return []Request{req2}, nil + } + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Timeout = 50 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 20}, invalidMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // first request should be ignored due to invalid merge/split function. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) + // second request should be sent after reaching the timeout. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestBatchSender_PostShutdown(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, WithBatcher(NewDefaultMergeBatcherConfig(), fakeBatchMergeFunc)) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, be.Shutdown(context.Background())) + + // Closed batch sender should act as a pass-through to not block queue draining. + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 2 + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, WithBatcher(NewDefaultMergeBatcherConfig(), fakeBatchMergeFunc), + WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + + time.Sleep(50 * time.Millisecond) + // the first request should be still in-flight. + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + + // the second request should be sent by reaching max concurrency limit. + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 16 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestBatchSender_BatchBlocking(t *testing.T) { + bCfg := NewDefaultMergeBatcherConfig() + bCfg.MinSizeItems = 3 + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, WithBatcher(bCfg, fakeBatchMergeFunc)) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 6 blocking requests + wg := sync.WaitGroup{} + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) + wg.Done() + }() + } + wg.Wait() + + // should be sent in two batches since the batch size is 3 + assert.Equal(t, uint64(2), sink.requestsCount.Load()) + assert.Equal(t, uint64(6), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) +} + +// Validate that the batch is cancelled once the first request in the request is cancelled +func TestBatchSender_BatchCancelled(t *testing.T) { + bCfg := NewDefaultMergeBatcherConfig() + bCfg.MinSizeItems = 2 + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, WithBatcher(bCfg, fakeBatchMergeFunc)) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 2 blocking requests + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + wg.Add(1) + go func() { + time.Sleep(20 * time.Millisecond) // ensure this call is the second + assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + cancel() // canceling the first request should cancel the whole batch + wg.Wait() + + // nothing should be delivered + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + assert.Equal(t, uint64(0), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestBatchSender_DrainActiveRequests(t *testing.T) { + bCfg := NewDefaultMergeBatcherConfig() + bCfg.MinSizeItems = 2 + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, WithBatcher(bCfg, fakeBatchMergeFunc)) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 3 blocking requests with a timeout + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + + // give time for the first two requests to be batched + time.Sleep(20 * time.Millisecond) + + // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. + // It should take 120 milliseconds to complete. + require.NoError(t, be.Shutdown(context.Background())) + + assert.Equal(t, uint64(2), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), + exporterqueue.NewMemoryQueueFactory[Request]())) + require.NotNil(t, be) + require.NoError(t, err) + return be +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index f1080bdee04..017272cb387 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -135,6 +135,12 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +func WithBatcher(cfg MergeBatcherConfig, mf BatchMergeFunc, opts ...BatcherOption) Option { + return func(o *baseExporter) { + o.batchSender = newBatchSender(cfg, o.set, mf, opts...) + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc @@ -154,6 +160,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -177,6 +184,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req unmarshaler: unmarshaler, signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), retrySender: &baseRequestSender{}, @@ -191,6 +199,13 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } be.connectSenders() + // If queue sender is enabled assign to the batch sender the same number of workers. + if qs, ok := be.queueSender.(*queueSender); ok { + if bs, ok := be.batchSender.(*batchSender); ok { + bs.concurrencyLimit = uint64(qs.numConsumers) + } + } + return be, nil } @@ -206,7 +221,8 @@ func (be *baseExporter) send(ctx context.Context, req Request) error { // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { - be.queueSender.setNextSender(be.obsrepSender) + be.queueSender.setNextSender(be.batchSender) + be.batchSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) } @@ -217,7 +233,12 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { return err } - // If no error then start the queueSender. + // If no error then start the batchSender. + if err := be.batchSender.Start(ctx, host); err != nil { + return err + } + + // Last start the queueSender. return be.queueSender.Start(ctx, host) } @@ -225,6 +246,8 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { return multierr.Combine( // First shutdown the retry sender, so the queue sender can flush the queue without retries. be.retrySender.Shutdown(ctx), + // Then shutdown the batch sender + be.batchSender.Shutdown(ctx), // Then shutdown the queue sender. be.queueSender.Shutdown(ctx), // Last shutdown the wrapped exporter itself. diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 3092c60c98f..3e539d7f9a0 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -31,7 +31,9 @@ var ( type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. + // NumConsumers is the number of consumers from the queue. Defaults to 10. + // If batching is enabled, a combined batch cannot contain more requests than the number of consumers. + // So it's recommended to set higher number of consumers if batching is enabled. NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` @@ -73,6 +75,7 @@ type queueSender struct { baseRequestSender fullName string queue exporterqueue.Queue[Request] + numConsumers int traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter @@ -87,6 +90,7 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.CreateSettings, qs := &queueSender{ fullName: set.ID.String(), queue: q, + numConsumers: numConsumers, traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 18e8228f946..12637f12da3 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,25 +5,115 @@ package exporterhelper import ( "context" + "sync/atomic" + "time" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) +type fakeRequestSink struct { + requestsCount *atomic.Uint64 + itemsCount *atomic.Uint64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: &atomic.Uint64{}, + itemsCount: &atomic.Uint64{}, + } +} + type fakeRequest struct { - items int - err error + items int + exportErr error + mergeErr error + delay time.Duration + sink *fakeRequestSink } -func (r fakeRequest) Export(context.Context) error { - return r.err +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(uint64(r.items)) + } + return nil } -func (r fakeRequest) ItemsCount() int { +func (r *fakeRequest) ItemsCount() int { return r.items } +func fakeBatchMergeFunc(_ context.Context, r1 Request, r2 Request) (Request, error) { + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: fr1.items + fr2.items, + sink: fr1.sink, + exportErr: fr2.exportErr, + delay: fr1.delay + fr2.delay, + }, nil +} + +func fakeBatchMergeSplitFunc(ctx context.Context, r1 Request, r2 Request, maxItems int) ([]Request, error) { + if maxItems == 0 { + r, err := fakeBatchMergeFunc(ctx, r1, r2) + return []Request{r}, err + } + + if r2.(*fakeRequest).mergeErr != nil { + return nil, r2.(*fakeRequest).mergeErr + } + + fr2 := r2.(*fakeRequest) + fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} + var res []Request + + // fill fr1 to maxItems if it's not nil + if r1 != nil { + fr1 := r1.(*fakeRequest) + fr1 = &fakeRequest{items: fr1.items, sink: fr1.sink, exportErr: fr1.exportErr, delay: fr1.delay} + if fr2.items <= maxItems-fr1.items { + fr1.items += fr2.items + if fr2.exportErr != nil { + fr1.exportErr = fr2.exportErr + } + return []Request{fr1}, nil + } + // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases + fr2.items -= maxItems - fr1.items + fr1.items = maxItems + res = append(res, fr1) + } + + // split fr2 to maxItems + for { + if fr2.items <= maxItems { + res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + break + } + res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + fr2.items -= maxItems + } + + return res, nil +} + type fakeRequestConverter struct { metricsError error tracesError error @@ -32,13 +122,13 @@ type fakeRequestConverter struct { } func (frc *fakeRequestConverter) requestFromMetricsFunc(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: frc.requestError}, frc.metricsError + return &fakeRequest{items: md.DataPointCount(), exportErr: frc.requestError}, frc.metricsError } func (frc *fakeRequestConverter) requestFromTracesFunc(_ context.Context, md ptrace.Traces) (Request, error) { - return fakeRequest{items: md.SpanCount(), err: frc.requestError}, frc.tracesError + return &fakeRequest{items: md.SpanCount(), exportErr: frc.requestError}, frc.tracesError } func (frc *fakeRequestConverter) requestFromLogsFunc(_ context.Context, md plog.Logs) (Request, error) { - return fakeRequest{items: md.LogRecordCount(), err: frc.requestError}, frc.logsError + return &fakeRequest{items: md.LogRecordCount(), exportErr: frc.requestError}, frc.logsError }