From adf2fb9a5c80e2ec29b231c9e0f92f4a7dfb26e3 Mon Sep 17 00:00:00 2001 From: timn Date: Wed, 29 May 2024 13:39:25 -0400 Subject: [PATCH] unit test for batchsender deadlock --- exporter/exporterhelper/batch_sender_test.go | 44 ++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index bc5720a99f8..492b98a1d58 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -205,6 +205,50 @@ func TestBatchSender_Shutdown(t *testing.T) { assert.Equal(t, uint64(3), sink.itemsCount.Load()) } +func TestBatchSender_Shutdown_deadlock(t *testing.T) { + batchCfg := exporterbatcher.NewDefaultConfig() + batchCfg.MinSizeItems = 10 + + waitBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { + // sleep to hold onto the lock to simulate slow merge until shutdown is triggered + // shutdown ticket should be blocked on shutdown case to get hold of the lock. + time.Sleep(300 * time.Millisecond) + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: fr1.items + fr2.items, + sink: fr1.sink, + exportErr: fr2.exportErr, + delay: fr1.delay + fr2.delay, + }, nil + } + + be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(waitBatchMergeFunc, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + for i := 0; i < 10; i++ { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + } + + // To make the request reached the batchSender before shutdown. + time.Sleep(50 * time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) + fmt.Println("TestBatchSender_Shutdown") + // wait for the channels to be in shutdown + // shutdown should force sending the batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + func TestBatchSender_Disabled(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false