Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemachanger: Deflake TestConcurrentSchemaChanges #107365

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/sql/schemachanger/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ go_test(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/rowenc",
"//pkg/sql/schemachanger/scexec",
Expand All @@ -59,7 +58,6 @@ go_test(
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
Expand All @@ -68,7 +66,6 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//errorspb",
Expand Down
139 changes: 47 additions & 92 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
Expand All @@ -41,14 +40,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/errorspb"
"github.com/lib/pq"
Expand All @@ -61,125 +58,83 @@ import (
// schema changes operating on the same descriptors are performed serially.
func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 106732, "flaky test")
defer log.Scope(t).Close(t)
ctx := context.Background()

ctx, cancel := context.WithCancel(context.Background())
// `finished` stores all finished schema changes in sequential order.
var finished []string

var maxValue = 4000
if util.RaceEnabled {
// Race builds are a lot slower, so use a smaller number of rows.
maxValue = 200
}
createIndexChan := make(chan struct{})
var createIndexChanClose sync.Once
alterPKChan := make(chan struct{})
var alterPKChanClose sync.Once

// Protects backfillNotification.
var mu syncutil.Mutex
var backfillNotification, continueNotification chan struct{}
// We have to have initBackfillNotification return the new
// channel rather than having later users read the original
// backfillNotification to make the race detector happy.
initBackfillNotification := func() (chan struct{}, chan struct{}) {
mu.Lock()
defer mu.Unlock()
backfillNotification = make(chan struct{})
continueNotification = make(chan struct{})
return backfillNotification, continueNotification
}
notifyBackfill := func() {
mu.Lock()
defer mu.Unlock()
if backfillNotification != nil {
close(backfillNotification)
backfillNotification = nil
}
if continueNotification != nil {
<-continueNotification
close(continueNotification)
continueNotification = nil
}
}
var alterPrimaryKeyBlockedCounter atomic.Uint32
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
AfterWaitingForConcurrentSchemaChanges: func(stmts []string, wasBlocked bool) {
for _, stmt := range stmts {
if wasBlocked && strings.Contains(stmt, "ALTER PRIMARY KEY") {
alterPrimaryKeyBlockedCounter.Add(1)
return
}
AfterStage: func(p scplan.Plan, stageIdx int) error {
if stageIdx == len(p.Stages)-1 {
// This is after the last stage of the plan.
require.Equal(t, 1, len(p.Statements))
finished = append(finished, p.Statements[0].StatementTag)
}
return nil
},
},
DistSQL: &execinfra.TestingKnobs{
RunBeforeBackfillChunk: func(_ roachpb.Span) error {
notifyBackfill()
RunBeforeBackfill: func() error {
createIndexChanClose.Do(func() {
close(createIndexChan)
})
<-alterPKChan // alterPkChan is closed after initial block but it's okay to read from a closed channel.
return nil
},
BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) {
if len(stmts) == 1 && strings.Contains(stmts[0], "ALTER TABLE") {
// Unblock `CREATE INDEX`.
alterPKChanClose.Do(func() {
close(alterPKChan)
})
}
},
},
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
// Prevent the GC job from running so we ensure that all the keys
// which were written remain.
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error {
<-ctx.Done()
return ctx.Err()
}},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
defer cancel()

if _, err := sqlDB.Exec(`CREATE DATABASE t`); err != nil {
t.Fatal(err)
}
tdb := sqlutils.MakeSQLRunner(sqlDB)

if _, err := sqlDB.Exec(`CREATE TABLE t.test (k INT NOT NULL, v INT)`); err != nil {
t.Fatal(err)
var maxValue = 4000
if util.RaceEnabled {
// Race builds are a lot slower, so use a smaller number of rows.
maxValue = 200
}
tdb.Exec(t, `CREATE DATABASE t;`)
tdb.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY, v INT NOT NULL);`)
if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

backfillNotif, continueNotif := initBackfillNotification()
// Use two channels to ensure:
// 1. CREATE INDEX is being executed before issuing ALTER TABLE, and
// 2. when executing ALTER TABLE, CREATE INDEX is still ongoing.
var wg sync.WaitGroup
wg.Add(1)
wg.Add(2)
go func() {
if _, err := sqlDB.Exec(`CREATE INDEX i ON t.test (v)`); err != nil {
t.Error(err)
}
tdb.Exec(t, `CREATE INDEX test_j_idx ON t.test (v);`)
wg.Done()
}()

// Wait until the create index schema change job has started
// before kicking off the alter primary key.
<-backfillNotif
require.Zero(t, alterPrimaryKeyBlockedCounter.Load())
wg.Add(1)
<-createIndexChan
go func() {
if _, err := sqlDB.Exec(`ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (k)`); err != nil {
t.Error(err)
}
tdb.Exec(t, `ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (v);`)
wg.Done()
}()

// Unblock the create index job.
continueNotif <- struct{}{}
// Assert both schema changes eventually finishes in their issuing order.
wg.Wait()

// The ALTER PRIMARY KEY schema change must have been blocked.
require.NotZero(t, alterPrimaryKeyBlockedCounter.Load())

// There should be 5 k/v pairs per row:
// * the original primary index keyed on rowid,
// * the first version of the secondary index on v with rowid as key suffix,
// * the intermediate version of the new primary index keyed on k but
// including rowid as a stored column,
// * the final version of the new primary index keyed on k and not including rowid,
// * the final version of the secondary index for v with k as the key suffix.
testutils.SucceedsSoon(t, func() error {
return sqltestutils.CheckTableKeyCount(ctx, kvDB, 5, maxValue)
})
require.Equal(t, []string{"CREATE INDEX", "ALTER TABLE"}, finished)
// There should be 3 unGC'ed kv pairs per row:
// 1. new secondary index keyed on `v` (from CREATE INDEX);
// 2. new primary index keyed on `v` (from ALTER PRIMARY KEY);
// 3. new unique secondary index keyed on `k` (from ALTER PRIMARY KEY);
sqltestutils.CheckTableKeyCount(ctx, kvDB, 3, maxValue)
}

func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) {
Expand Down