Skip to content

Commit

Permalink
server: start jobs before running migrations
Browse files Browse the repository at this point in the history
Migrations sometimes try to run jobs, for example running schema changes
or other operations that are executed as jobs. Thus the jobs system must
be started before the migrations are run.

Release note: none.
  • Loading branch information
dt committed Mar 7, 2020
1 parent c5e8099 commit 2cefc00
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
60 changes: 31 additions & 29 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,23 +1700,6 @@ func (s *Server) Start(ctx context.Context) error {
s.ClusterSettings(),
)

var bootstrapVersion roachpb.Version
if err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
}); err != nil {
return err
}
if err := migMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil {
select {
case <-s.stopper.ShouldQuiesce():
// Avoid turning an early shutdown into a fatal error. See #19579.
return errors.New("server is shutting down")
default:
log.Fatalf(ctx, "%+v", err)
}
}
log.Infof(ctx, "done ensuring all necessary migrations have run")

// Start garbage collecting system events.
s.startSystemLogsGC(ctx)

Expand Down Expand Up @@ -1758,6 +1741,37 @@ func (s *Server) Start(ctx context.Context) error {
s.mux.Handle(statusVars, http.HandlerFunc(s.status.handleVars))
log.Event(ctx, "added http endpoints")

// Start the jobs subsystem.
{
var regLiveness jobs.NodeLiveness = s.nodeLiveness
if testingLiveness := s.cfg.TestingKnobs.RegistryLiveness; testingLiveness != nil {
regLiveness = testingLiveness.(*jobs.FakeNodeLiveness)
}
if err := s.jobRegistry.Start(
ctx, s.stopper, regLiveness, jobs.DefaultCancelInterval, jobs.DefaultAdoptInterval,
); err != nil {
return err
}
}

// Run startup migrations (note: these depend on jobs subsystem running).
var bootstrapVersion roachpb.Version
if err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
}); err != nil {
return err
}
if err := migMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil {
select {
case <-s.stopper.ShouldQuiesce():
// Avoid turning an early shutdown into a fatal error. See #19579.
return errors.New("server is shutting down")
default:
log.Fatalf(ctx, "%+v", err)
}
}
log.Infof(ctx, "done ensuring all necessary migrations have run")

// Attempt to upgrade cluster version.
s.startAttemptUpgrade(ctx)

Expand Down Expand Up @@ -1794,18 +1808,6 @@ func (s *Server) Start(ctx context.Context) error {
// node. This also uses SQL.
s.leaseMgr.DeleteOrphanedLeases(timeThreshold)

{
var regLiveness jobs.NodeLiveness = s.nodeLiveness
if testingLiveness := s.cfg.TestingKnobs.RegistryLiveness; testingLiveness != nil {
regLiveness = testingLiveness.(*jobs.FakeNodeLiveness)
}
if err := s.jobRegistry.Start(
ctx, s.stopper, regLiveness, jobs.DefaultCancelInterval, jobs.DefaultAdoptInterval,
); err != nil {
return err
}
}

log.Event(ctx, "server ready")

return nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/sqlbase/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ CREATE TABLE system.jobs (
status STRING NOT NULL,
created TIMESTAMP NOT NULL DEFAULT now(),
payload BYTES NOT NULL,
payload BYTES,
progress BYTES,
INDEX (status, created),
FAMILY (id, status, created, payload),
FAMILY progress (progress)
Expand Down Expand Up @@ -730,10 +730,11 @@ var (
ColumnIDs: []ColumnID{1, 2, 3, 4},
},
{
Name: "fam_1_progress",
ID: 1,
ColumnNames: []string{"progress"},
ColumnIDs: []ColumnID{5},
Name: "progress",
ID: 1,
ColumnNames: []string{"progress"},
ColumnIDs: []ColumnID{5},
DefaultColumnID: 5,
},
},
NextFamilyID: 2,
Expand Down

0 comments on commit 2cefc00

Please sign in to comment.