Skip to content

Commit

Permalink
Merge pull request #45828 from dt/jobs-before-migrations
Browse files Browse the repository at this point in the history
server: start jobs registry before running startup migrations
  • Loading branch information
dt authored Mar 7, 2020
2 parents e7769aa + 2cefc00 commit 50b31b6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 71 deletions.
60 changes: 31 additions & 29 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,23 +1700,6 @@ func (s *Server) Start(ctx context.Context) error {
s.ClusterSettings(),
)

var bootstrapVersion roachpb.Version
if err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
}); err != nil {
return err
}
if err := migMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil {
select {
case <-s.stopper.ShouldQuiesce():
// Avoid turning an early shutdown into a fatal error. See #19579.
return errors.New("server is shutting down")
default:
log.Fatalf(ctx, "%+v", err)
}
}
log.Infof(ctx, "done ensuring all necessary migrations have run")

// Start garbage collecting system events.
s.startSystemLogsGC(ctx)

Expand Down Expand Up @@ -1758,6 +1741,37 @@ func (s *Server) Start(ctx context.Context) error {
s.mux.Handle(statusVars, http.HandlerFunc(s.status.handleVars))
log.Event(ctx, "added http endpoints")

// Start the jobs subsystem.
{
var regLiveness jobs.NodeLiveness = s.nodeLiveness
if testingLiveness := s.cfg.TestingKnobs.RegistryLiveness; testingLiveness != nil {
regLiveness = testingLiveness.(*jobs.FakeNodeLiveness)
}
if err := s.jobRegistry.Start(
ctx, s.stopper, regLiveness, jobs.DefaultCancelInterval, jobs.DefaultAdoptInterval,
); err != nil {
return err
}
}

// Run startup migrations (note: these depend on jobs subsystem running).
var bootstrapVersion roachpb.Version
if err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
}); err != nil {
return err
}
if err := migMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil {
select {
case <-s.stopper.ShouldQuiesce():
// Avoid turning an early shutdown into a fatal error. See #19579.
return errors.New("server is shutting down")
default:
log.Fatalf(ctx, "%+v", err)
}
}
log.Infof(ctx, "done ensuring all necessary migrations have run")

// Attempt to upgrade cluster version.
s.startAttemptUpgrade(ctx)

Expand Down Expand Up @@ -1794,18 +1808,6 @@ func (s *Server) Start(ctx context.Context) error {
// node. This also uses SQL.
s.leaseMgr.DeleteOrphanedLeases(timeThreshold)

{
var regLiveness jobs.NodeLiveness = s.nodeLiveness
if testingLiveness := s.cfg.TestingKnobs.RegistryLiveness; testingLiveness != nil {
regLiveness = testingLiveness.(*jobs.FakeNodeLiveness)
}
if err := s.jobRegistry.Start(
ctx, s.stopper, regLiveness, jobs.DefaultCancelInterval, jobs.DefaultAdoptInterval,
); err != nil {
return err
}
}

log.Event(ctx, "server ready")

return nil
Expand Down
16 changes: 13 additions & 3 deletions pkg/sql/sqlbase/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ CREATE TABLE system.jobs (
status STRING NOT NULL,
created TIMESTAMP NOT NULL DEFAULT now(),
payload BYTES NOT NULL,
progress BYTES,
INDEX (status, created),
FAMILY (id, status, created, payload)
FAMILY (id, status, created, payload),
FAMILY progress (progress)
);`

// web_sessions are used to track authenticated user actions over stateless
Expand Down Expand Up @@ -717,17 +719,25 @@ var (
{Name: "status", ID: 2, Type: *types.String},
{Name: "created", ID: 3, Type: *types.Timestamp, DefaultExpr: &nowString},
{Name: "payload", ID: 4, Type: *types.Bytes},
{Name: "progress", ID: 5, Type: *types.Bytes, Nullable: true},
},
NextColumnID: 5,
NextColumnID: 6,
Families: []ColumnFamilyDescriptor{
{
Name: "fam_0_id_status_created_payload",
ID: 0,
ColumnNames: []string{"id", "status", "created", "payload"},
ColumnIDs: []ColumnID{1, 2, 3, 4},
},
{
Name: "progress",
ID: 1,
ColumnNames: []string{"progress"},
ColumnIDs: []ColumnID{5},
DefaultColumnID: 5,
},
},
NextFamilyID: 1,
NextFamilyID: 2,
PrimaryIndex: pk("id"),
Indexes: []IndexDescriptor{
{
Expand Down
41 changes: 2 additions & 39 deletions pkg/sqlmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/sqlmigrations/leasemanager"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -182,10 +181,8 @@ var backwardCompatibleMigrations = []migrationDescriptor{
newDescriptorIDs: databaseIDs(sessiondata.DefaultDatabaseName, sessiondata.PgDatabaseName),
},
{
// Introduced in v2.1.
// TODO(dt): Bake into v19.1.
name: "add progress to system.jobs",
workFn: addJobsProgress,
// Introduced in v2.1. Baked into 20.1.
name: "add progress to system.jobs",
},
{
// Introduced in v19.1.
Expand Down Expand Up @@ -1020,40 +1017,6 @@ func createDefaultDbs(ctx context.Context, r runner) error {
return err
}

func addJobsProgress(ctx context.Context, r runner) error {
// Ideally to add a column progress, we'd just run a query like:
// ALTER TABLE system.jobs ADD COLUMN progress BYTES CREATE FAMLIY progress;
// However SQL-managed schema changes use jobs tracking internally, which will
// fail if the jobs table's schema has not been migrated. Rather than add very
// significant complexity to the jobs code to try to handle pre- and post-
// migration schemas, we can dodge the chicken-and-egg problem by doing our
// schema change manually.
return r.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
}
desc, err := sqlbase.GetMutableTableDescFromID(ctx, txn, keys.JobsTableID)
if err != nil {
return err
}
if _, err := desc.FindActiveColumnByName("progress"); err == nil {
return nil
}
desc.AddColumn(&sqlbase.ColumnDescriptor{
Name: "progress",
Type: *types.Bytes,
Nullable: true,
})
if err := desc.AddColumnToFamilyMaybeCreate("progress", "progress", true, false); err != nil {
return err
}
if err := desc.AllocateIDs(); err != nil {
return err
}
return txn.Put(ctx, sqlbase.MakeDescMetadataKey(desc.ID), sqlbase.WrapDescriptor(desc))
})
}

func retireOldTsPurgeIntervalSettings(ctx context.Context, r runner) error {
// We are going to deprecate `timeseries.storage.10s_resolution_ttl`
// into `timeseries.storage.resolution_10s.ttl` if the latter is not
Expand Down

0 comments on commit 50b31b6

Please sign in to comment.