diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index dfd4750cb3ac..f232d9b66e7c 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -181,8 +181,8 @@ func (p *planner) waitForDescriptorSchemaChanges( ctx context.Context, descID descpb.ID, scs SchemaChangerState, ) error { - if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil && - knobs.BeforeWaitingForConcurrentSchemaChanges != nil { + knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs + if knobs != nil && knobs.BeforeWaitingForConcurrentSchemaChanges != nil { knobs.BeforeWaitingForConcurrentSchemaChanges(scs.stmts) } @@ -196,7 +196,6 @@ func (p *planner) waitForDescriptorSchemaChanges( // Wait for the descriptor to no longer be claimed by a schema change. start := timeutil.Now() logEvery := log.Every(10 * time.Second) - var wasBlocked bool for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { now := p.ExecCfg().Clock.Now() var isBlocked bool @@ -215,9 +214,7 @@ func (p *planner) waitForDescriptorSchemaChanges( }); err != nil { return err } - if isBlocked { - wasBlocked = true - } else { + if !isBlocked { break } if logEvery.ShouldLog() { @@ -226,11 +223,9 @@ func (p *planner) waitForDescriptorSchemaChanges( " waited %v so far", descID, timeutil.Since(start), ) } - } - - if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil && - knobs.AfterWaitingForConcurrentSchemaChanges != nil { - knobs.AfterWaitingForConcurrentSchemaChanges(scs.stmts, wasBlocked) + if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil { + knobs.WhileWaitingForConcurrentSchemaChanges(scs.stmts) + } } log.Infof( diff --git a/pkg/sql/schemachanger/scexec/testing_knobs.go b/pkg/sql/schemachanger/scexec/testing_knobs.go index 684318390ece..bd3d7fd65f20 100644 --- a/pkg/sql/schemachanger/scexec/testing_knobs.go +++ b/pkg/sql/schemachanger/scexec/testing_knobs.go @@ -27,9 +27,9 @@ type TestingKnobs struct { // for concurrent schema changes to finish. BeforeWaitingForConcurrentSchemaChanges func(stmts []string) - // AfterWaitingForConcurrentSchemaChanges is called at the end of waiting + // WhileWaitingForConcurrentSchemaChanges is called while waiting // for concurrent schema changes to finish. - AfterWaitingForConcurrentSchemaChanges func(stmts []string, wasBlocked bool) + WhileWaitingForConcurrentSchemaChanges func(stmts []string) // OnPostCommitPlanError is called whenever the schema changer job returns an // error on building the state or on planning the stages. diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 0c5736519533..3cec12adce61 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -100,9 +100,9 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - AfterWaitingForConcurrentSchemaChanges: func(stmts []string, wasBlocked bool) { + WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { for _, stmt := range stmts { - if wasBlocked && strings.Contains(stmt, "ADD COLUMN") { + if strings.Contains(stmt, "ADD COLUMN") { addColumnBlockedCounter.Add(1) return } @@ -161,6 +161,14 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { wg.Done() }() + // The ADD COLUMN schema change must block. + testutils.SucceedsSoon(t, func() error { + if addColumnBlockedCounter.Load() == 0 { + return errors.New("waiting for concurrent schema change to block") + } + return nil + }) + // Unblock the create index job. continueNotif <- struct{}{} wg.Wait()