diff --git a/.chloggen/fix_batch_sender_chaining.yaml b/.chloggen/fix_batch_sender_chaining.yaml new file mode 100644 index 00000000000..55dece860fc --- /dev/null +++ b/.chloggen/fix_batch_sender_chaining.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix a bug when the retry and timeout logic was not applied with enabled batching. + +# One or more tracking issues or pull requests related to the change +issues: [10166] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 086c9724aa3..d6903f9ef28 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -119,7 +119,7 @@ func newEmptyBatch() *batch { // Caller must hold the lock. func (bs *batchSender) exportActiveBatch() { go func(b *batch) { - b.err = b.request.Export(b.ctx) + b.err = bs.nextSender.send(b.ctx, b.request) close(b.done) }(bs.activeBatch) bs.activeBatch = newEmptyBatch() diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index bc59c0d63c5..6dea3c7dcef 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -483,6 +483,50 @@ func TestBatchSender_ShutdownDeadlock(t *testing.T) { assert.EqualValues(t, 8, sink.itemsCount.Load()) } +func TestBatchSenderWithTimeout(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 10 + bCfg.MaxSizeItems = 10 + tCfg := NewDefaultTimeoutSettings() + tCfg.Timeout = 50 * time.Microsecond + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithTimeout(tCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // Send 3 concurrent requests that should be merged in two batched + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + wg.Done() + }() + } + wg.Wait() + assert.EqualValues(t, 2, sink.requestsCount.Load()) + assert.EqualValues(t, 12, sink.itemsCount.Load()) + + // 3 requests with a delay must be cancelled by the timeout sender + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) + wg.Done() + }() + } + wg.Wait() + + assert.NoError(t, be.Shutdown(context.Background())) + + // The sink should not change + assert.EqualValues(t, 2, sink.requestsCount.Load()) + assert.EqualValues(t, 12, sink.itemsCount.Load()) +} + func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))