Skip to content

Commit

Permalink
schemachanger: deflake TestConcurrentDeclarativeSchemaChanges
Browse files Browse the repository at this point in the history
This commit deflakes this test by checking that the second schema change
actually does block because of the first one, rather than checking that
it has blocked. The bug was that the latter wasn't always guaranteed to
happen because we didn't force the schema changes to run in parallel.

Fixes cockroachdb#106732.

Release note: None
  • Loading branch information
Marius Posta committed Jul 27, 2023
1 parent 839428e commit 572486c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
17 changes: 6 additions & 11 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (p *planner) waitForDescriptorSchemaChanges(
ctx context.Context, descID descpb.ID, scs SchemaChangerState,
) error {

if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil &&
knobs.BeforeWaitingForConcurrentSchemaChanges != nil {
knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs
if knobs != nil && knobs.BeforeWaitingForConcurrentSchemaChanges != nil {
knobs.BeforeWaitingForConcurrentSchemaChanges(scs.stmts)
}

Expand All @@ -123,7 +123,6 @@ func (p *planner) waitForDescriptorSchemaChanges(
// Wait for the descriptor to no longer be claimed by a schema change.
start := timeutil.Now()
logEvery := log.Every(10 * time.Second)
var wasBlocked bool
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
now := p.ExecCfg().Clock.Now()
var isBlocked bool
Expand All @@ -146,9 +145,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
}); err != nil {
return err
}
if isBlocked {
wasBlocked = true
} else {
if !isBlocked {
break
}
if logEvery.ShouldLog() {
Expand All @@ -157,11 +154,9 @@ func (p *planner) waitForDescriptorSchemaChanges(
" waited %v so far", descID, timeutil.Since(start),
)
}
}

if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil &&
knobs.AfterWaitingForConcurrentSchemaChanges != nil {
knobs.AfterWaitingForConcurrentSchemaChanges(scs.stmts, wasBlocked)
if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil {
knobs.WhileWaitingForConcurrentSchemaChanges(scs.stmts)
}
}

log.Infof(
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scexec/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ type TestingKnobs struct {
// for concurrent schema changes to finish.
BeforeWaitingForConcurrentSchemaChanges func(stmts []string)

// AfterWaitingForConcurrentSchemaChanges is called at the end of waiting
// WhileWaitingForConcurrentSchemaChanges is called while waiting
// for concurrent schema changes to finish.
AfterWaitingForConcurrentSchemaChanges func(stmts []string, wasBlocked bool)
WhileWaitingForConcurrentSchemaChanges func(stmts []string)

// OnPostCommitPlanError is called whenever the schema changer job returns an
// error on building the state or on planning the stages.
Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
AfterWaitingForConcurrentSchemaChanges: func(stmts []string, wasBlocked bool) {
WhileWaitingForConcurrentSchemaChanges: func(stmts []string) {
for _, stmt := range stmts {
if wasBlocked && strings.Contains(stmt, "ADD COLUMN") {
if strings.Contains(stmt, "ADD COLUMN") {
addColumnBlockedCounter.Add(1)
return
}
Expand Down Expand Up @@ -161,6 +161,14 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
wg.Done()
}()

// The ADD COLUMN schema change must block.
testutils.SucceedsSoon(t, func() error {
if addColumnBlockedCounter.Load() == 0 {
return errors.New("waiting for concurrent schema change to block")
}
return nil
})

// Unblock the create index job.
continueNotif <- struct{}{}
wg.Wait()
Expand Down

0 comments on commit 572486c

Please sign in to comment.