Skip to content

Commit

Permalink
changefeedccl: block testfeed closure on cancel status
Browse files Browse the repository at this point in the history
TestChangefeedNemeses would sometimes flake on cloudstorage sink likely
due to the feed writing files during the tempdirectory closing.  Stress
testing it on cloudstorage sink failed at 160 executions.

This small change blocks feed.Close() on actually seeing the "cancelled"
job status to ensure the feed is completely stopped prior to Close
completing.

Release note: None
  • Loading branch information
samiskin committed Jun 27, 2022
1 parent 3acda7c commit b91b729
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3208,15 +3208,19 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (1, 1)`)

cf := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
defer closeFeed(t, cf)
jobID := cf.(cdctest.EnterpriseTestFeed).JobID()
defer func() {
// Manually update job status to avoid closeFeed waitng for the registry to cancel it
sqlDB.Exec(t, `UPDATE system.jobs SET status = $1 WHERE id = $2`, jobs.StatusFailed, jobID)
closeFeed(t, cf)
}()

assertPayloads(t, cf, []string{
`foo: [1]->{"after": {"a": 1, "b": 1}}`,
})

// Mimic the claim dying and being cleaned up by
// another node.
jobID := cf.(cdctest.EnterpriseTestFeed).JobID()
sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID)

// Expect that the distflow fails since it can't
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,17 @@ func (f *jobFeed) Close() error {
close(f.shutdown)
return nil
}
if status == string(jobs.StatusFailed) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.terminalErr = errors.New("changefeed failed")
close(f.shutdown)
return nil
}
if _, err := f.db.Exec(`CANCEL JOB $1`, f.jobID); err != nil {
log.Infof(context.Background(), `could not cancel feed %d: %v`, f.jobID, err)
} else {
return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusCanceled })
}
}

Expand Down

0 comments on commit b91b729

Please sign in to comment.