Skip to content

Commit

Permalink
[exporterhelper] Fix potential deadlocks in BatcherSender shutdown (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax authored May 30, 2024
1 parent 10bcef3 commit ff6f029
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 20 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fix-batcher-sender-shutdown-deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential deadlocks in BatcherSender shutdown

# One or more tracking issues or pull requests related to the change
issues: [10255]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
50 changes: 30 additions & 20 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,24 @@ type batchSender struct {

logger *zap.Logger

shutdownCh chan struct{}
stopped *atomic.Bool
shutdownCh chan struct{}
shutdownCompleteCh chan struct{}
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) *batchSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
mergeSplitFunc: msf,
shutdownCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
mergeSplitFunc: msf,
shutdownCh: make(chan struct{}),
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}
Expand All @@ -66,14 +68,19 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
for {
select {
case <-bs.shutdownCh:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
// There is a minimal chance that another request is added after the shutdown signal.
// This loop will handle that case.
for bs.activeRequests.Load() > 0 {
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
}
bs.mu.Unlock()
if !timer.Stop() {
<-timer.C
}
close(bs.shutdownCompleteCh)
return
case <-timer.C:
bs.mu.Lock()
Expand Down Expand Up @@ -118,6 +125,12 @@ func (bs *batchSender) exportActiveBatch() {
bs.activeBatch = newEmptyBatch()
}

func (bs *batchSender) resetTimer() {
if !bs.stopped.Load() {
bs.resetTimerCh <- struct{}{}
}
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
Expand Down Expand Up @@ -154,7 +167,7 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down Expand Up @@ -194,7 +207,7 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand All @@ -215,9 +228,6 @@ func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
close(bs.shutdownCh)
// Wait for the active requests to finish.
for bs.activeRequests.Load() > 0 {
time.Sleep(10 * time.Millisecond)
}
<-bs.shutdownCompleteCh
return nil
}
57 changes: 57 additions & 0 deletions exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,63 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {
}
}

// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being
// merged.
func TestBatchSender_ShutdownDeadlock(t *testing.T) {
blockMerge := make(chan struct{})
waitMerge := make(chan struct{}, 10)

// blockedBatchMergeFunc blocks until the blockMerge channel is closed
blockedBatchMergeFunc := func(_ context.Context, r1 Request, _ Request) (Request, error) {
waitMerge <- struct{}{}
<-blockMerge
return r1, nil
}

bCfg := exporterbatcher.NewDefaultConfig()
bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc)))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

sink := newFakeRequestSink()

// Send 10 concurrent requests and wait for them to start
startWG := sync.WaitGroup{}
for i := 0; i < 10; i++ {
startWG.Add(1)
go func() {
startWG.Done()
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
}
startWG.Wait()

// Wait for at least one batch to enter the merge function
<-waitMerge

// Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks,
// then wait for the exporter to finish.
startShutdown := make(chan struct{})
doneShutdown := make(chan struct{})
go func() {
close(startShutdown)
require.Nil(t, be.Shutdown(context.Background()))
close(doneShutdown)
}()
<-startShutdown
close(blockMerge)
<-doneShutdown

// The exporter should have sent only one "merged" batch, in some cases it might send two if the shutdown
// happens before the batch is fully merged.
assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load())

// blockedBatchMergeFunc just returns the first request, so the items count should be 4 times the requests count.
assert.Equal(t, sink.requestsCount.Load()*4, sink.itemsCount.Load())
}

func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))
Expand Down

0 comments on commit ff6f029

Please sign in to comment.