Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
114840: server: delay job scheduler startup r=adityamaru a=stevendanna

We previously moved external storage init after the sqlServer preStart
to ensure we have a SQL instance ID. Unfortunately, sqlServer.preStart
also started the job scheduler, which may immediately execute a job
that assumes that external storage has been initialized.

This is a minimal fix intended for backport that starts the job
scheduler after we initialize external storage.

Fixes cockroachdb#114842

Epic: none

Release note (bug fix): Fix a bug where scheduled jobs using external
storage providers may fail shortly after node startup.

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Dec 6, 2023
2 parents 7b54813 + ebdc06c commit ccdc317
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 27 deletions.
11 changes: 9 additions & 2 deletions pkg/server/external_storage_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,18 @@ func (e *externalStorageBuilder) init(
registry.AddMetricStruct(e.metrics)
}

func (e *externalStorageBuilder) assertInitComplete() error {
if !e.initCalled {
return errors.AssertionFailedf("external storage not initialized")
}
return nil
}

func (e *externalStorageBuilder) makeExternalStorage(
ctx context.Context, dest cloudpb.ExternalStorage, opts ...cloud.ExternalStorageOption,
) (cloud.ExternalStorage, error) {
if !e.initCalled {
return nil, errors.New("cannot create external storage before init")
return nil, errors.AssertionFailedf("cannot create external storage before init")
}
return cloud.MakeExternalStorage(
ctx, dest, e.conf, e.settings, e.blobClientFactory, e.db, e.limiters, e.metrics,
Expand All @@ -92,7 +99,7 @@ func (e *externalStorageBuilder) makeExternalStorageFromURI(
ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption,
) (cloud.ExternalStorage, error) {
if !e.initCalled {
return nil, errors.New("cannot create external storage before init")
return nil, errors.AssertionFailedf("cannot create external storage before init")
}
return cloud.ExternalStorageFromURI(
ctx, uri, e.conf, e.settings, e.blobClientFactory, user, e.db, e.limiters, e.metrics,
Expand Down
29 changes: 29 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2082,6 +2082,12 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
return err
}

// Start the job scheduler now that the SQL Server and
// external storage is initialized.
if err := s.initJobScheduler(ctx); err != nil {
return err
}

// If enabled, start reporting diagnostics.
if s.cfg.StartDiagnosticsReporting && !cluster.TelemetryOptOut {
s.startDiagnostics(workersCtx)
Expand Down Expand Up @@ -2198,6 +2204,29 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
return maybeImportTS(ctx, s)
}

// initJobScheduler starts the job scheduler. This must be called
// after sqlServer.preStart and after our external storage providers
// have been initialized.
//
// TODO(ssd): We need to clean up the ordering/ownership here. The SQL
// server owns the job scheduler because the job scheduler needs an
// internal executor. But, the topLevelServer owns initialization of
// the external storage providers.
func (s *topLevelServer) initJobScheduler(ctx context.Context) error {
if s.cfg.DisableSQLServer {
return nil
}
// The job scheduler may immediately start jobs that require
// external storage providers to be available. We expect the
// server start up ordering to ensure this. Hitting this error
// is a programming error somewhere in server startup.
if err := s.externalStorageBuilder.assertInitComplete(); err != nil {
return err
}
s.sqlServer.startJobScheduler(ctx, s.cfg.TestingKnobs)
return nil
}

// AcceptClients starts listening for incoming SQL clients over the network.
// This mirrors the implementation of (*SQLServerWrapper).AcceptClients.
// TODO(knz): Find a way to implement this method only once for both.
Expand Down
52 changes: 27 additions & 25 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,31 +1681,6 @@ func (s *SQLServer) preStart(
// node. This also uses SQL.
s.leaseMgr.DeleteOrphanedLeases(ctx, orphanedLeasesTimeThresholdNanos)

// Start scheduled jobs daemon.
jobs.StartJobSchedulerDaemon(
ctx,
stopper,
s.metricsRegistry,
&scheduledjobs.JobExecutionConfig{
Settings: s.execCfg.Settings,
DB: s.execCfg.InternalDB,
TestingKnobs: knobs.JobsTestingKnobs,
PlanHookMaker: func(ctx context.Context, opName string, txn *kv.Txn, user username.SQLUsername) (interface{}, func()) {
// This is a hack to get around a Go package dependency cycle. See comment
// in sql/jobs/registry.go on planHookMaker.
return sql.NewInternalPlanner(
opName,
txn,
user,
&sql.MemoryMetrics{},
s.execCfg,
sql.NewInternalSessionData(ctx, s.execCfg.Settings, opName),
)
},
},
scheduledjobs.ProdJobSchedulerEnv,
)

scheduledlogging.Start(
ctx, stopper, s.execCfg.InternalDB, s.execCfg.Settings,
s.execCfg.CaptureIndexUsageStatsKnobs,
Expand Down Expand Up @@ -1749,6 +1724,33 @@ func (s *SQLServer) preStart(
return nil
}

func (s *SQLServer) startJobScheduler(ctx context.Context, knobs base.TestingKnobs) {
// Start scheduled jobs daemon.
jobs.StartJobSchedulerDaemon(
ctx,
s.stopper,
s.metricsRegistry,
&scheduledjobs.JobExecutionConfig{
Settings: s.execCfg.Settings,
DB: s.execCfg.InternalDB,
TestingKnobs: knobs.JobsTestingKnobs,
PlanHookMaker: func(ctx context.Context, opName string, txn *kv.Txn, user username.SQLUsername) (interface{}, func()) {
// This is a hack to get around a Go package dependency cycle. See comment
// in sql/jobs/registry.go on planHookMaker.
return sql.NewInternalPlanner(
opName,
txn,
user,
&sql.MemoryMetrics{},
s.execCfg,
sql.NewInternalSessionData(ctx, s.execCfg.Settings, opName),
)
},
},
scheduledjobs.ProdJobSchedulerEnv,
)
}

// waitForActiveAutoConfigEnvironments waits until the set of
// ActiveEnvironments is empty. ActiveEnvironments is empty once there
// are no more tasks to run.
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,19 @@ func TestAssertEnginesEmpty(t *testing.T) {
require.Error(t, assertEnginesEmpty([]storage.Engine{eng}))
}

// TestAssertExternalStorageInitializedBeforeJobSchedulerStart is a
// bit silly, but the goal is to make sure we don't accidentally move
// things around related to external storage in a way that would break
// the job scheduler.
func TestAssertExternalStorageInitializedBeforeJobSchedulerStart(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tlServer := &topLevelServer{
externalStorageBuilder: &externalStorageBuilder{},
}
require.Error(t, tlServer.initJobScheduler(context.Background()))
}

func Test_makeFakeNodeStatuses(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
31 changes: 31 additions & 0 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
s.registry,
)

// Start the job scheduler now that the SQL Server and
// external storage is initialized.
if err := s.initJobScheduler(ctx); err != nil {
return err
}

// If enabled, start reporting diagnostics.
if s.sqlServer.cfg.StartDiagnosticsReporting && !cluster.TelemetryOptOut {
s.startDiagnostics(workersCtx)
Expand Down Expand Up @@ -920,6 +926,31 @@ func (s *SQLServerWrapper) serveConn(
}
}

// initJobScheduler starts the job scheduler. This must be called
// after sqlServer.preStart and after our external storage providers
// have been initialized.
//
// TODO(ssd): We need to clean up the ordering/ownership here. The SQL
// server owns the job scheduler because the job scheduler needs an
// internal executor. But, the topLevelServer owns initialization of
// the external storage providers.
//
// TODO(ssd): Remove duplication with *topLevelServer.
func (s *SQLServerWrapper) initJobScheduler(ctx context.Context) error {
if s.cfg.DisableSQLServer {
return nil
}
// The job scheduler may immediately start jobs that require
// external storage providers to be available. We expect the
// server start up ordering to ensure this. Hitting this error
// is a programming error somewhere in server startup.
if err := s.externalStorageBuilder.assertInitComplete(); err != nil {
return err
}
s.sqlServer.startJobScheduler(ctx, s.cfg.TestingKnobs)
return nil
}

// AcceptClients starts listening for incoming SQL clients over the network.
// This mirrors the implementation of (*Server).AcceptClients.
// TODO(knz): Find a way to implement this method only once for both.
Expand Down

0 comments on commit ccdc317

Please sign in to comment.