diff --git a/pkg/server/autoconfig/acprovider/provider.go b/pkg/server/autoconfig/acprovider/provider.go index bdb6ddd869ae..0cd35eee351a 100644 --- a/pkg/server/autoconfig/acprovider/provider.go +++ b/pkg/server/autoconfig/acprovider/provider.go @@ -26,8 +26,9 @@ type Provider interface { // initial event immediately. EnvUpdate() <-chan struct{} - // ActiveEnvironments returns the IDs of environments that have - // tasks available for execution. + // ActiveEnvironments returns the IDs of environments that + // have tasks available for execution. A nil or empty array is + // returned when there are no more tasks to run. ActiveEnvironments() []autoconfigpb.EnvironmentID // Peek will block waiting for the first task the provider believes diff --git a/pkg/server/autoconfig/auto_config_test.go b/pkg/server/autoconfig/auto_config_test.go index 5c233f02396c..1c2395d32f92 100644 --- a/pkg/server/autoconfig/auto_config_test.go +++ b/pkg/server/autoconfig/auto_config_test.go @@ -12,12 +12,16 @@ package autoconfig_test import ( "context" + gosql "database/sql" + "math" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/autoconfig" "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" @@ -45,6 +49,15 @@ type testTask struct { seen bool } +var endProfileTask = autoconfigpb.Task{ + TaskID: autoconfigpb.TaskID(math.MaxUint64), + Description: "end of configuration profile", + MinVersion: clusterversion.TestingBinaryVersion, + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{}, + }, +} + var testTasks = []testTask{ {task: autoconfigpb.Task{ TaskID: 123, @@ -85,6 +98,7 @@ var testTasks = []testTask{ }, }, }}, + {task: endProfileTask}, } func (p *testProvider) EnvUpdate() <-chan struct{} { @@ -93,7 +107,12 @@ func (p *testProvider) EnvUpdate() <-chan struct{} { } func (p *testProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID { - return []autoconfigpb.EnvironmentID{testEnvID} + p.Lock() + defer p.Unlock() + if len(p.tasks) > 0 { + return []autoconfigpb.EnvironmentID{testEnvID} + } + return nil } func (p *testProvider) Pop( @@ -161,6 +180,7 @@ func TestAutoConfig(t *testing.T) { provider.notifyCh <- struct{}{} ctx := context.Background() + noWait := time.Duration(0) s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462), @@ -169,6 +189,9 @@ func TestAutoConfig(t *testing.T) { AutoConfig: &autoconfig.TestingKnobs{ Provider: provider, }, + Server: &server.TestingKnobs{ + AutoConfigProfileStartupWaitTime: &noWait, + }, }, }) defer s.Stopper().Stop(ctx) @@ -225,3 +248,108 @@ SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`, err = sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.bar`).Scan(&unused) require.NoError(t, err) } + +func TestAutoConfigWaitAtStartupTimesOut(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + provider := &testProvider{ + t: t, + notifyCh: make(chan struct{}, 1), + peekWaitCh: make(chan struct{}), + tasks: []testTask{ + {task: autoconfigpb.Task{ + TaskID: 123, + Description: "test task that takes longer than the maxWait", + MinVersion: clusterversion.TestingBinaryVersion, + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + UsernameProto: username.NodeUserName().EncodeProto(), + TransactionalStatements: []string{ + "SELECT pg_sleep(300)", + }, + }, + }, + }}, + {task: endProfileTask}, + }, + } + provider.notifyCh <- struct{}{} + close(provider.peekWaitCh) + + ctx := context.Background() + shortWait := 50 * time.Millisecond + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462), + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + AutoConfig: &autoconfig.TestingKnobs{ + Provider: provider, + }, + Server: &server.TestingKnobs{ + AutoConfigProfileStartupWaitTime: &shortWait, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + // To keep us honest, assert that the task hasn't been + // completed even though server startup has returned. + taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnvID, Task: provider.tasks[0].task.TaskID} + completionMarker := taskRef.EncodeCompletionMarkerKey() + var result bool + err := sqlDB.QueryRowContext(ctx, `SELECT true FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`, + jobs.AutoConfigRunnerJobID, + completionMarker).Scan(&result) + require.Error(t, err, gosql.ErrNoRows) +} + +func TestAutoConfigWaitAtStartup(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + provider := &testProvider{ + t: t, + notifyCh: make(chan struct{}, 1), + peekWaitCh: make(chan struct{}), + tasks: []testTask{ + {task: autoconfigpb.Task{ + TaskID: 123, + Description: "create-test-table", + MinVersion: clusterversion.TestingBinaryVersion, + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + UsernameProto: username.NodeUserName().EncodeProto(), + TransactionalStatements: []string{ + "SELECT pg_sleep(1)", + "CREATE TABLE system.t (pk int primary key)", + "INSERT INTO system.t VALUES (1)", + }, + }, + }, + }}, + {task: endProfileTask}, + }, + } + provider.notifyCh <- struct{}{} + close(provider.peekWaitCh) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462), + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + AutoConfig: &autoconfig.TestingKnobs{ + Provider: provider, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + // The side-effects of the profile execution should be + // immediately visible. + var value int + err := sqlDB.QueryRow("SELECT pk FROM system.t").Scan(&value) + require.NoError(t, err) + require.Equal(t, 1, value) +} diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f00944b072e1..eee2afc113e4 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1744,9 +1744,55 @@ func (s *SQLServer) preStart( } } + s.waitForActiveAutoConfigEnvironments(ctx) + return nil } +// waitForActiveAutoConfigEnvironments waits until the set of +// ActiveEnvironments is empty. ActiveEnvironments is empty once there +// are no more tasks to run. +// +// This is sufficient to ensure all configuration task jobs have +// completed becuase the environment runner only enqueues a task after +// the previous task has completed and configuration profiles include +// an "end task" that runs after all previous tasks. +func (s *SQLServer) waitForActiveAutoConfigEnvironments(ctx context.Context) { + maxWait := 2 * time.Minute + serverKnobs := s.cfg.TestingKnobs.Server + if serverKnobs != nil && serverKnobs.(*TestingKnobs).AutoConfigProfileStartupWaitTime != nil { + maxWait = *serverKnobs.(*TestingKnobs).AutoConfigProfileStartupWaitTime + } + + if maxWait == 0 { + log.Infof(ctx, "waiting for auto-configuration environments disabled") + return + } + + envs := s.execCfg.AutoConfigProvider.ActiveEnvironments() + if len(envs) == 0 { + log.Infof(ctx, "auto-configuration environments not set or already complete") + return + } + + log.Infof(ctx, "waiting up to %s for auto-configuration environments %v to complete", maxWait, envs) + ctx, cancel := context.WithTimeout(ctx, maxWait) // nolint:context + defer cancel() + retryCfg := retry.Options{ + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 5 * time.Second, + } + waitStart := timeutil.Now() + for i := retry.StartWithCtx(ctx, retryCfg); i.Next(); { + envs := s.execCfg.AutoConfigProvider.ActiveEnvironments() + if len(envs) == 0 { + log.Infof(ctx, "auto-configuration environments reported no active tasks after %s", timeutil.Since(waitStart)) + return + } + } + log.Warningf(ctx, "auto-configuration environments still running after %s, moving on", timeutil.Since(waitStart)) +} + // startCheckService verifies that the tenant has the right // service mode initially, then starts an async checker // to stop the server if the service mode changes. diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index c5831fa7f945..c37e212a7e3c 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -172,6 +172,11 @@ type TestingKnobs struct { // // TODO(ahmad/healthy-pod): Remove this once `v23.2` is cut and update `TestTenantAutoUpgrade` // to reflect the changes. AllowTenantAutoUpgradeOnInternalVersionChanges bool + + // If non-nil, AutoConfigProfileStartupWaitTime is used when + // waiting for any active configuration environments to + // complete their tasks. + AutoConfigProfileStartupWaitTime *time.Duration } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.