Skip to content

Commit

Permalink
changfeedccl: deflake TestAlterChangefeedTelemetry
Browse files Browse the repository at this point in the history
The job system clears the lease asyncronously after cancelation. This
lease clearing transaction can cause a restart in the alter changefeed
transaction, which will lead to different feature counter
counts. Thus, we want to wait for the lease clear. However, the lease
clear isn't guaranteed to happen, so we only wait a few seconds for
it.

Release note: None
  • Loading branch information
stevendanna committed Nov 15, 2022
1 parent d596620 commit a4ab9b6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ go_test(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
28 changes: 25 additions & 3 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -423,11 +422,34 @@ func TestAlterChangefeedTelemetry(t *testing.T) {

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
defer closeFeed(t, testFeed)

feed := testFeed.(cdctest.EnterpriseTestFeed)

require.NoError(t, feed.Pause())
jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID())

// The job system clears the lease asyncronously after
// cancellation. This lease clearing transaction can
// cause a restart in the alter changefeed
// transaction, which will lead to different feature
// counter counts. Thus, we want to wait for the lease
// clear. However, the lease clear isn't guaranteed to
// happen, so we only wait a few seconds for it.
waitForNoLease := func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for {
if ctx.Err() != nil {
return
}
var sessionID []byte
sqlDB.QueryRow(t, `SELECT claim_session_id FROM system.jobs WHERE id = $1`, feed.JobID()).Scan(&sessionID)
if sessionID == nil {
return
}
time.Sleep(250 * time.Millisecond)
}
}

waitForNoLease()
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID()))

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
Expand Down
13 changes: 0 additions & 13 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,6 @@ func waitForJobToHaveStatus(
}, 2*time.Minute)
}

func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) {
t.Helper()
testutils.SucceedsWithin(t, func() error {
var sessionID []byte
var instanceID gosql.NullInt64
db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID)
if sessionID == nil && !instanceID.Valid {
return nil
}
return errors.Newf("job %d still has claim information", jobID)
}, 2*time.Minute)
}

// RunJob runs the provided job control statement, initializing, notifying and
// closing the chan at the passed pointer (see below for why) and returning the
// jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard
Expand Down

0 comments on commit a4ab9b6

Please sign in to comment.