From 25d9fb70c7f3ee6beea0628fcb4908ab1e9678e6 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 29 May 2024 14:47:25 -0700 Subject: [PATCH] [exporterhelper] Fix potential deadlocks in BatcherSender shutdown Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/10255 --- .../fix-batcher-sender-shutdown-deadlock.yaml | 20 ++++++++ exporter/exporterhelper/batch_sender.go | 10 +++- exporter/exporterhelper/batch_sender_test.go | 51 +++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 .chloggen/fix-batcher-sender-shutdown-deadlock.yaml diff --git a/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml b/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml new file mode 100644 index 00000000000..76baecac6b5 --- /dev/null +++ b/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix potential deadlocks in BatcherSender shutdown + +# One or more tracking issues or pull requests related to the change +issues: [10255] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 29065bfe980..2370aba54c9 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -118,6 +118,12 @@ func (bs *batchSender) exportActiveBatch() { bs.activeBatch = newEmptyBatch() } +func (bs *batchSender) resetTimer() { + if !bs.stopped.Load() { + bs.resetTimerCh <- struct{}{} + } +} + // isActiveBatchReady returns true if the active batch is ready to be exported. // The batch is ready if it has reached the minimum size or the concurrency limit is reached. // Caller must hold the lock. @@ -154,7 +160,7 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err batch := bs.activeBatch if bs.isActiveBatchReady() || len(reqs) > 1 { bs.exportActiveBatch() - bs.resetTimerCh <- struct{}{} + bs.resetTimer() } bs.mu.Unlock() <-batch.done @@ -194,7 +200,7 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { batch := bs.activeBatch if bs.isActiveBatchReady() { bs.exportActiveBatch() - bs.resetTimerCh <- struct{}{} + bs.resetTimer() } bs.mu.Unlock() <-batch.done diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index bc5720a99f8..e6a7f1f93d6 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -439,6 +439,57 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } } +// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// merged. +func TestBatchSender_ShutdownDeadlock(t *testing.T) { + blockMerge := make(chan struct{}) + + // blockedBatchMergeFunc blocks until the blockMerge channel is closed + blockedBatchMergeFunc := func(_ context.Context, r1 Request, _ Request) (Request, error) { + <-blockMerge + return r1, nil + } + + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // Send 10 concurrent requests and wait for them to start + startWG := sync.WaitGroup{} + for i := 0; i < 10; i++ { + startWG.Add(1) + go func() { + startWG.Done() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + } + startWG.Wait() + + // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, + // then wait for the exporter to finish. + startShutdown := make(chan struct{}) + doneShutdown := make(chan struct{}) + go func() { + close(startShutdown) + require.Nil(t, be.Shutdown(context.Background())) + close(doneShutdown) + }() + <-startShutdown + close(blockMerge) + <-doneShutdown + + // The exporter should have sent only one "merged" batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + + // blockedBatchMergeFunc just returns the first request, so the items count should be 4 + assert.Equal(t, uint64(4), sink.itemsCount.Load()) +} + func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))