From f9aa0de0764513452a7a0bcd0484d1aa3026a5a5 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Mon, 13 Mar 2023 06:45:05 +0000 Subject: [PATCH] changefeedccl: fix kafka messagetoolarge test failure Fixes: #93847 This fixes the following bug in the TestChangefeedKafkaMessageTooLarge test setup: 1. The feed starts sending messages, randomly triggering a MessageTooLarge error causing a retry with a smaller batch size 2. Eventually, while the retrying process is still ongoing, all 2000 rows are successfully received by the mock kafka sink, causing assertPayloads to complete, causing the test to closeFeed and run CANCEL on the changefeed. 3. The retrying process gets stuck in sendMessage where it can't send the message to the feedCh which has been closed since the changefeed is trying to close, but it also can't exit on the mock sink's tg.done since that only closes after the feed fully closes, which requires the retrying process to end. Release note: None --- pkg/ccl/changefeedccl/testfeed_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 41eae24843d2..8e9f1b8e16e5 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1342,6 +1342,7 @@ func (s *fakeKafkaSink) Dial() error { } select { case s.feedCh <- m: + case <-kafka.stopWorkerCh: case <-s.tg.done: } return nil