diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml new file mode 100755 index 00000000000..613cfbac5b2 --- /dev/null +++ b/.chloggen/batch-exporter-helper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add experimental batching capabilities to the exporter helper + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/exporterbatcher/batch_func.go b/exporter/exporterbatcher/batch_func.go new file mode 100644 index 00000000000..951a5ca897a --- /dev/null +++ b/exporter/exporterbatcher/batch_func.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +import "context" + +// BatchMergeFunc is a function that merges two requests into a single request. +// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is +// marked as not mutable. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeFunc[T any] func(context.Context, T, T) (T, error) + +// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the +// configured limit provided in MaxSizeConfig. +// All the returned requests MUST have a number of items that does not exceed the maximum number of items. +// Size of the last returned request MUST be less or equal than the size of any other returned request. +// The original request MUST not be mutated if error is returned after mutation or if the exporter is +// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil, +// make sure to check it before using. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error) diff --git a/exporter/exporterbatcher/config.go b/exporter/exporterbatcher/config.go new file mode 100644 index 00000000000..c7ed15c4641 --- /dev/null +++ b/exporter/exporterbatcher/config.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +import ( + "errors" + "time" +) + +// Config defines a configuration for batching requests based on a timeout and a minimum number of items. +// MaxSizeItems defines batch splitting functionality if it's more than zero. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // FlushTimeout sets the time after which a batch will be sent regardless of its size. + FlushTimeout time.Duration `mapstructure:"flush_timeout"` + + MinSizeConfig `mapstructure:",squash"` + MaxSizeConfig `mapstructure:",squash"` +} + +// MinSizeConfig defines the configuration for the minimum number of items in a batch. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MinSizeConfig struct { + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +// MaxSizeConfig defines the configuration for the maximum number of items in a batch. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MaxSizeConfig struct { + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP. + // If the batch size exceeds this value, it will be broken up into smaller batches if possible. + // Setting this value to zero disables the maximum size limit. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +func (c Config) Validate() error { + if c.MinSizeItems < 0 { + return errors.New("min_size_items must be greater than or equal to zero") + } + if c.MaxSizeItems < 0 { + return errors.New("max_size_items must be greater than or equal to zero") + } + if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems { + return errors.New("max_size_items must be greater than or equal to min_size_items") + } + if c.FlushTimeout <= 0 { + return errors.New("timeout must be greater than zero") + } + return nil +} + +func NewDefaultConfig() Config { + return Config{ + Enabled: true, + FlushTimeout: 200 * time.Millisecond, + MinSizeConfig: MinSizeConfig{ + MinSizeItems: 8192, + }, + } +} diff --git a/exporter/exporterbatcher/config_test.go b/exporter/exporterbatcher/config_test.go new file mode 100644 index 00000000000..337f83ce318 --- /dev/null +++ b/exporter/exporterbatcher/config_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig_Validate(t *testing.T) { + cfg := NewDefaultConfig() + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") + + cfg = NewDefaultConfig() + cfg.FlushTimeout = 0 + assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero") + + cfg.MaxSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") + + cfg = NewDefaultConfig() + cfg.MaxSizeItems = 20000 + cfg.MinSizeItems = 20001 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") +} diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..25873f25b87 --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,220 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" +) + +// batchSender is a component that accepts places requests into batches before passing them to the downstream senders. +// +// batch_processor implements consumer.Traces and consumer.Metrics +// +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +type batchSender struct { + baseRequestSender + cfg exporterbatcher.Config + mergeFunc exporterbatcher.BatchMergeFunc[Request] + mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request] + + // concurrencyLimit is the maximum number of goroutines that can be created by the batcher. + // If this number is reached and all the goroutines are busy, the batch will be sent right away. + // Populated from the number of queue consumers if queue is enabled. + concurrencyLimit uint64 + activeRequests atomic.Uint64 + + resetTimerCh chan struct{} + + mu sync.Mutex + activeBatch *batch + + logger *zap.Logger + + shutdownCh chan struct{} + stopped *atomic.Bool +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender { + bs := &batchSender{ + activeBatch: newEmptyBatch(), + cfg: cfg, + logger: set.Logger, + shutdownCh: make(chan struct{}), + stopped: &atomic.Bool{}, + resetTimerCh: make(chan struct{}), + } + return bs +} + +func (bs *batchSender) Start(_ context.Context, _ component.Host) error { + timer := time.NewTimer(bs.cfg.FlushTimeout) + go func() { + for { + select { + case <-bs.shutdownCh: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + timer.Reset(bs.cfg.FlushTimeout) + case <-bs.resetTimerCh: + if !timer.Stop() { + <-timer.C + } + timer.Reset(bs.cfg.FlushTimeout) + } + } + }() + + return nil +} + +type batch struct { + ctx context.Context + request Request + done chan struct{} + err error +} + +func newEmptyBatch() *batch { + return &batch{ + ctx: context.Background(), + done: make(chan struct{}), + } +} + +// exportActiveBatch exports the active batch asynchronously and replaces it with a new one. +// Caller must hold the lock. +func (bs *batchSender) exportActiveBatch() { + go func(b *batch) { + b.err = b.request.Export(b.ctx) + close(b.done) + }(bs.activeBatch) + bs.activeBatch = newEmptyBatch() +} + +// isActiveBatchReady returns true if the active batch is ready to be exported. +// The batch is ready if it has reached the minimum size or the concurrency limit is reached. +// Caller must hold the lock. +func (bs *batchSender) isActiveBatchReady() bool { + return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems || + (bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit) +} + +func (bs *batchSender) send(ctx context.Context, req Request) error { + // Stopped batch sender should act as pass-through to allow the queue to be drained. + if bs.stopped.Load() { + return bs.nextSender.send(ctx, req) + } + + bs.activeRequests.Add(1) + defer bs.activeRequests.Add(^uint64(0)) + + if bs.cfg.MaxSizeItems > 0 { + return bs.sendMergeSplitBatch(ctx, req) + } + return bs.sendMergeBatch(ctx, req) +} + +// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. +func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + + reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req) + if err != nil || len(reqs) == 0 { + bs.mu.Unlock() + return err + } + if len(reqs) == 1 || bs.activeBatch.request != nil { + bs.updateActiveBatch(ctx, reqs[0]) + batch := bs.activeBatch + if bs.isActiveBatchReady() || len(reqs) > 1 { + bs.exportActiveBatch() + bs.resetTimerCh <- struct{}{} + } + bs.mu.Unlock() + <-batch.done + if batch.err != nil { + return batch.err + } + reqs = reqs[1:] + } else { + bs.mu.Unlock() + } + + // Intentionally do not put the last request in the active batch to not block it. + // TODO: Consider including the partial request in the error to avoid double publishing. + for _, r := range reqs { + if err := r.Export(ctx); err != nil { + return err + } + } + return nil +} + +// sendMergeBatch sends the request to the batch and waits for the batch to be exported. +func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + if bs.activeBatch.request != nil { + var err error + req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req) + if err != nil { + bs.mu.Unlock() + return err + } + } + bs.updateActiveBatch(ctx, req) + batch := bs.activeBatch + if bs.isActiveBatchReady() { + bs.exportActiveBatch() + bs.resetTimerCh <- struct{}{} + } + bs.mu.Unlock() + <-batch.done + return batch.err +} + +// updateActiveBatch update the active batch to the new merged request and context. +// The context is only set once and is not updated after the first call. +// Merging the context would be complex and require an additional goroutine to handle the context cancellation. +// We take the approach of using the context from the first request since it's likely to have the shortest timeout. +func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) { + if bs.activeBatch.request == nil { + bs.activeBatch.ctx = ctx + } + bs.activeBatch.request = req +} + +func (bs *batchSender) Shutdown(context.Context) error { + bs.stopped.Store(true) + close(bs.shutdownCh) + // Wait for the active requests to finish. + for bs.activeRequests.Load() > 0 { + time.Sleep(10 * time.Millisecond) + } + return nil +} diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go new file mode 100644 index 00000000000..b48cf6255fa --- /dev/null +++ b/exporter/exporterhelper/batch_sender_test.go @@ -0,0 +1,407 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterqueue" +) + +func TestBatchSender_Merge(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 10 + cfg.FlushTimeout = 100 * time.Millisecond + + tests := []struct { + name string + batcherOption Option + }{ + { + name: "split_disabled", + batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + }, + { + name: "split_high_limit", + batcherOption: func() Option { + c := cfg + c.MaxSizeItems = 1000 + return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + }(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // the first two requests should be merged into one and sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + + // the third and fifth requests should be sent by reaching the timeout + // the fourth request should be ignored because of the merge error. + time.Sleep(50 * time.Millisecond) + + // should be ignored because of the merge error. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, + mergeErr: errors.New("merge error")})) + + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_BatchExportError(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 10 + tests := []struct { + name string + batcherOption Option + expectedRequests uint64 + expectedItems uint64 + }{ + { + name: "merge_only", + batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + }, + { + name: "merge_without_split_triggered", + batcherOption: func() Option { + c := cfg + c.MaxSizeItems = 200 + return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + }(), + }, + { + name: "merge_with_split_triggered", + batcherOption: func() Option { + c := cfg + c.MaxSizeItems = 20 + return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + }(), + expectedRequests: 1, + expectedItems: 20, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + + // the first two requests should be blocked by the batchSender. + time.Sleep(50 * time.Millisecond) + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + + // the third request should trigger the export and cause an error. + errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} + require.NoError(t, be.send(context.Background(), errReq)) + + // the batch should be dropped since the queue doesn't have requeuing enabled. + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == tt.expectedRequests && + sink.itemsCount.Load() == tt.expectedItems && + be.batchSender.(*batchSender).activeRequests.Load() == uint64(0) && + be.queueSender.(*queueSender).queue.Size() == 0 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_MergeOrSplit(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 5 + cfg.MaxSizeItems = 10 + cfg.FlushTimeout = 100 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, + mergeErr: errors.New("split error")})) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 + }, 50*time.Millisecond, 10*time.Millisecond) + + fmt.Println("TestBatchSender_MergeOrSplit") +} + +func TestBatchSender_Shutdown(t *testing.T) { + batchCfg := exporterbatcher.NewDefaultConfig() + batchCfg.MinSizeItems = 10 + be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // To make the request reached the batchSender before shutdown. + time.Sleep(50 * time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) + + // shutdown should force sending the batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func TestBatchSender_Disabled(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + cfg.MaxSizeItems = 10 + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, + WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent right away because batching is disabled. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { + invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, + error) { + // reply with invalid 0 length slice if req2 is more than 20 items + if req2.(*fakeRequest).items > 20 { + return []Request{}, nil + } + // otherwise reply with a single request. + return []Request{req2}, nil + } + cfg := exporterbatcher.NewDefaultConfig() + cfg.FlushTimeout = 50 * time.Millisecond + cfg.MaxSizeItems = 20 + be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // first request should be ignored due to invalid merge/split function. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) + // second request should be sent after reaching the timeout. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestBatchSender_PostShutdown(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, + fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, be.Shutdown(context.Background())) + + // Closed batch sender should act as a pass-through to not block queue draining. + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 2 + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + + time.Sleep(50 * time.Millisecond) + // the first request should be still in-flight. + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + + // the second request should be sent by reaching max concurrency limit. + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 16 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestBatchSender_BatchBlocking(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 3 + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 6 blocking requests + wg := sync.WaitGroup{} + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) + wg.Done() + }() + } + wg.Wait() + + // should be sent in two batches since the batch size is 3 + assert.Equal(t, uint64(2), sink.requestsCount.Load()) + assert.Equal(t, uint64(6), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) +} + +// Validate that the batch is cancelled once the first request in the request is cancelled +func TestBatchSender_BatchCancelled(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 2 + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 2 blocking requests + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + wg.Add(1) + go func() { + time.Sleep(20 * time.Millisecond) // ensure this call is the second + assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + cancel() // canceling the first request should cancel the whole batch + wg.Wait() + + // nothing should be delivered + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + assert.Equal(t, uint64(0), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestBatchSender_DrainActiveRequests(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 2 + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 3 blocking requests with a timeout + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + + // give time for the first two requests to be batched + time.Sleep(20 * time.Millisecond) + + // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. + // It should take 120 milliseconds to complete. + require.NoError(t, be.Shutdown(context.Background())) + + assert.Equal(t, uint64(2), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { + be, err := newBaseExporter(defaultSettings, "", newNoopObsrepSender, batchOption, + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) + require.NotNil(t, be) + require.NoError(t, err) + return be +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 509570960e8..ef7bb123025 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" ) @@ -138,6 +139,36 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +// BatcherOption apply changes to batcher sender. +type BatcherOption func(*batchSender) + +// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. +func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption { + return func(bs *batchSender) { + bs.mergeFunc = mf + bs.mergeSplitFunc = msf + } +} + +// WithBatcher enables batching for an exporter based on custom request types. +// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and +// WithRequestBatchFuncs provided. +// TODO: Add OTLP-based batch functions applied by default so it can be used with New[Traces|Metrics|Logs]Exporter exporter helpers. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { + return func(o *baseExporter) { + bs := newBatchSender(cfg, o.set) + for _, opt := range opts { + opt(bs) + } + if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { + panic("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters") + } + o.batchSender = bs + } +} + // withMarshaler is used to set the request marshaler for the new exporter helper. // It must be provided as the first option when creating a new exporter helper. func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option { @@ -172,6 +203,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -189,6 +221,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf be := &baseExporter{ signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), retrySender: &baseRequestSender{}, @@ -203,6 +236,13 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf } be.connectSenders() + // If queue sender is enabled assign to the batch sender the same number of workers. + if qs, ok := be.queueSender.(*queueSender); ok { + if bs, ok := be.batchSender.(*batchSender); ok { + bs.concurrencyLimit = uint64(qs.numConsumers) + } + } + return be, nil } @@ -218,7 +258,8 @@ func (be *baseExporter) send(ctx context.Context, req Request) error { // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { - be.queueSender.setNextSender(be.obsrepSender) + be.queueSender.setNextSender(be.batchSender) + be.batchSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) } @@ -229,7 +270,12 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { return err } - // If no error then start the queueSender. + // If no error then start the batchSender. + if err := be.batchSender.Start(ctx, host); err != nil { + return err + } + + // Last start the queueSender. return be.queueSender.Start(ctx, host) } @@ -237,6 +283,8 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { return multierr.Combine( // First shutdown the retry sender, so the queue sender can flush the queue without retries. be.retrySender.Shutdown(ctx), + // Then shutdown the batch sender + be.batchSender.Shutdown(ctx), // Then shutdown the queue sender. be.queueSender.Shutdown(ctx), // Last shutdown the wrapped exporter itself. diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 3092c60c98f..3e539d7f9a0 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -31,7 +31,9 @@ var ( type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. + // NumConsumers is the number of consumers from the queue. Defaults to 10. + // If batching is enabled, a combined batch cannot contain more requests than the number of consumers. + // So it's recommended to set higher number of consumers if batching is enabled. NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` @@ -73,6 +75,7 @@ type queueSender struct { baseRequestSender fullName string queue exporterqueue.Queue[Request] + numConsumers int traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter @@ -87,6 +90,7 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.CreateSettings, qs := &queueSender{ fullName: set.ID.String(), queue: q, + numConsumers: numConsumers, traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 18e8228f946..fe373c67e12 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,25 +5,117 @@ package exporterhelper import ( "context" + "sync/atomic" + "time" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) +type fakeRequestSink struct { + requestsCount *atomic.Uint64 + itemsCount *atomic.Uint64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: &atomic.Uint64{}, + itemsCount: &atomic.Uint64{}, + } +} + type fakeRequest struct { - items int - err error + items int + exportErr error + mergeErr error + delay time.Duration + sink *fakeRequestSink } -func (r fakeRequest) Export(context.Context) error { - return r.err +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(uint64(r.items)) + } + return nil } -func (r fakeRequest) ItemsCount() int { +func (r *fakeRequest) ItemsCount() int { return r.items } +func fakeBatchMergeFunc(_ context.Context, r1 Request, r2 Request) (Request, error) { + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: fr1.items + fr2.items, + sink: fr1.sink, + exportErr: fr2.exportErr, + delay: fr1.delay + fr2.delay, + }, nil +} + +func fakeBatchMergeSplitFunc(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { + maxItems := cfg.MaxSizeItems + if maxItems == 0 { + r, err := fakeBatchMergeFunc(ctx, r1, r2) + return []Request{r}, err + } + + if r2.(*fakeRequest).mergeErr != nil { + return nil, r2.(*fakeRequest).mergeErr + } + + fr2 := r2.(*fakeRequest) + fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} + var res []Request + + // fill fr1 to maxItems if it's not nil + if r1 != nil { + fr1 := r1.(*fakeRequest) + fr1 = &fakeRequest{items: fr1.items, sink: fr1.sink, exportErr: fr1.exportErr, delay: fr1.delay} + if fr2.items <= maxItems-fr1.items { + fr1.items += fr2.items + if fr2.exportErr != nil { + fr1.exportErr = fr2.exportErr + } + return []Request{fr1}, nil + } + // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases + fr2.items -= maxItems - fr1.items + fr1.items = maxItems + res = append(res, fr1) + } + + // split fr2 to maxItems + for { + if fr2.items <= maxItems { + res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + break + } + res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + fr2.items -= maxItems + } + + return res, nil +} + type fakeRequestConverter struct { metricsError error tracesError error @@ -32,13 +124,13 @@ type fakeRequestConverter struct { } func (frc *fakeRequestConverter) requestFromMetricsFunc(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: frc.requestError}, frc.metricsError + return &fakeRequest{items: md.DataPointCount(), exportErr: frc.requestError}, frc.metricsError } func (frc *fakeRequestConverter) requestFromTracesFunc(_ context.Context, md ptrace.Traces) (Request, error) { - return fakeRequest{items: md.SpanCount(), err: frc.requestError}, frc.tracesError + return &fakeRequest{items: md.SpanCount(), exportErr: frc.requestError}, frc.tracesError } func (frc *fakeRequestConverter) requestFromLogsFunc(_ context.Context, md plog.Logs) (Request, error) { - return fakeRequest{items: md.LogRecordCount(), err: frc.requestError}, frc.logsError + return &fakeRequest{items: md.LogRecordCount(), exportErr: frc.requestError}, frc.logsError }