From b91b7294e960a8009f8f66da4769db9687fc17bd Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Wed, 22 Jun 2022 21:21:30 +0000 Subject: [PATCH] changefeedccl: block testfeed closure on cancel status TestChangefeedNemeses would sometimes flake on cloudstorage sink likely due to the feed writing files during the tempdirectory closing. Stress testing it on cloudstorage sink failed at 160 executions. This small change blocks feed.Close() on actually seeing the "cancelled" job status to ensure the feed is completely stopped prior to Close completing. Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 8 ++++++-- pkg/ccl/changefeedccl/testfeed_test.go | 9 +++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index c5833f38dbf2..99e36dc6f214 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3208,7 +3208,12 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (1, 1)`) cf := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") - defer closeFeed(t, cf) + jobID := cf.(cdctest.EnterpriseTestFeed).JobID() + defer func() { + // Manually update job status to avoid closeFeed waitng for the registry to cancel it + sqlDB.Exec(t, `UPDATE system.jobs SET status = $1 WHERE id = $2`, jobs.StatusFailed, jobID) + closeFeed(t, cf) + }() assertPayloads(t, cf, []string{ `foo: [1]->{"after": {"a": 1, "b": 1}}`, @@ -3216,7 +3221,6 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) { // Mimic the claim dying and being cleaned up by // another node. - jobID := cf.(cdctest.EnterpriseTestFeed).JobID() sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID) // Expect that the distflow fails since it can't diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 6bb055244668..f2b02bbd6887 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -423,8 +423,17 @@ func (f *jobFeed) Close() error { close(f.shutdown) return nil } + if status == string(jobs.StatusFailed) { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.terminalErr = errors.New("changefeed failed") + close(f.shutdown) + return nil + } if _, err := f.db.Exec(`CANCEL JOB $1`, f.jobID); err != nil { log.Infof(context.Background(), `could not cancel feed %d: %v`, f.jobID, err) + } else { + return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusCanceled }) } }