From ff6f029f99b30667823ea43547c099078c8f0211 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Thu, 30 May 2024 09:41:41 -0700 Subject: [PATCH] [exporterhelper] Fix potential deadlocks in BatcherSender shutdown (#10258) Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/10255 --- .../fix-batcher-sender-shutdown-deadlock.yaml | 20 +++++++ exporter/exporterhelper/batch_sender.go | 50 +++++++++------- exporter/exporterhelper/batch_sender_test.go | 57 +++++++++++++++++++ 3 files changed, 107 insertions(+), 20 deletions(-) create mode 100644 .chloggen/fix-batcher-sender-shutdown-deadlock.yaml diff --git a/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml b/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml new file mode 100644 index 00000000000..76baecac6b5 --- /dev/null +++ b/.chloggen/fix-batcher-sender-shutdown-deadlock.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix potential deadlocks in BatcherSender shutdown + +# One or more tracking issues or pull requests related to the change +issues: [10255] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 29065bfe980..086c9724aa3 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -40,22 +40,24 @@ type batchSender struct { logger *zap.Logger - shutdownCh chan struct{} - stopped *atomic.Bool + shutdownCh chan struct{} + shutdownCompleteCh chan struct{} + stopped *atomic.Bool } // newBatchSender returns a new batch consumer component. func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings, mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) *batchSender { bs := &batchSender{ - activeBatch: newEmptyBatch(), - cfg: cfg, - logger: set.Logger, - mergeFunc: mf, - mergeSplitFunc: msf, - shutdownCh: make(chan struct{}), - stopped: &atomic.Bool{}, - resetTimerCh: make(chan struct{}), + activeBatch: newEmptyBatch(), + cfg: cfg, + logger: set.Logger, + mergeFunc: mf, + mergeSplitFunc: msf, + shutdownCh: make(chan struct{}), + shutdownCompleteCh: make(chan struct{}), + stopped: &atomic.Bool{}, + resetTimerCh: make(chan struct{}), } return bs } @@ -66,14 +68,19 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error { for { select { case <-bs.shutdownCh: - bs.mu.Lock() - if bs.activeBatch.request != nil { - bs.exportActiveBatch() + // There is a minimal chance that another request is added after the shutdown signal. + // This loop will handle that case. + for bs.activeRequests.Load() > 0 { + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() } - bs.mu.Unlock() if !timer.Stop() { <-timer.C } + close(bs.shutdownCompleteCh) return case <-timer.C: bs.mu.Lock() @@ -118,6 +125,12 @@ func (bs *batchSender) exportActiveBatch() { bs.activeBatch = newEmptyBatch() } +func (bs *batchSender) resetTimer() { + if !bs.stopped.Load() { + bs.resetTimerCh <- struct{}{} + } +} + // isActiveBatchReady returns true if the active batch is ready to be exported. // The batch is ready if it has reached the minimum size or the concurrency limit is reached. // Caller must hold the lock. @@ -154,7 +167,7 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err batch := bs.activeBatch if bs.isActiveBatchReady() || len(reqs) > 1 { bs.exportActiveBatch() - bs.resetTimerCh <- struct{}{} + bs.resetTimer() } bs.mu.Unlock() <-batch.done @@ -194,7 +207,7 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { batch := bs.activeBatch if bs.isActiveBatchReady() { bs.exportActiveBatch() - bs.resetTimerCh <- struct{}{} + bs.resetTimer() } bs.mu.Unlock() <-batch.done @@ -215,9 +228,6 @@ func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) { func (bs *batchSender) Shutdown(context.Context) error { bs.stopped.Store(true) close(bs.shutdownCh) - // Wait for the active requests to finish. - for bs.activeRequests.Load() > 0 { - time.Sleep(10 * time.Millisecond) - } + <-bs.shutdownCompleteCh return nil } diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index 68396ed7c58..08ab42f89b5 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -436,6 +436,63 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } } +// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// merged. +func TestBatchSender_ShutdownDeadlock(t *testing.T) { + blockMerge := make(chan struct{}) + waitMerge := make(chan struct{}, 10) + + // blockedBatchMergeFunc blocks until the blockMerge channel is closed + blockedBatchMergeFunc := func(_ context.Context, r1 Request, _ Request) (Request, error) { + waitMerge <- struct{}{} + <-blockMerge + return r1, nil + } + + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // Send 10 concurrent requests and wait for them to start + startWG := sync.WaitGroup{} + for i := 0; i < 10; i++ { + startWG.Add(1) + go func() { + startWG.Done() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + } + startWG.Wait() + + // Wait for at least one batch to enter the merge function + <-waitMerge + + // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, + // then wait for the exporter to finish. + startShutdown := make(chan struct{}) + doneShutdown := make(chan struct{}) + go func() { + close(startShutdown) + require.Nil(t, be.Shutdown(context.Background())) + close(doneShutdown) + }() + <-startShutdown + close(blockMerge) + <-doneShutdown + + // The exporter should have sent only one "merged" batch, in some cases it might send two if the shutdown + // happens before the batch is fully merged. + assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) + + // blockedBatchMergeFunc just returns the first request, so the items count should be 4 times the requests count. + assert.Equal(t, sink.requestsCount.Load()*4, sink.itemsCount.Load()) +} + func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))