Skip to content

Commit

Permalink
server,util: avoid retry forever during startup upon premature shutdown
Browse files Browse the repository at this point in the history
We need to have the `RunIdempotentWithRetry` calls abort if the
surrounding server shuts down prematurely. This happens most
frequently in tests that fail for another reason; and also
in multitenancy tests with multiple tenant server side-by-side.

Release note: None
  • Loading branch information
knz authored and aliher1911 committed Apr 26, 2023
1 parent e863cef commit 3a217ee
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error {
// will only update the key if it exists, to avoid race conditions during
// node decommissioning, so we have to error out if we can't create it.
if err := startup.RunIdempotentWithRetry(ctx,
n.stopper.ShouldQuiesce(),
"kv write node status", func(ctx context.Context) error {
return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */)
}); err != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,11 +1252,13 @@ func (s *SQLServer) preStart(

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := startup.RunIdempotentWithRetry(ctx, "sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
s.stopper.ShouldQuiesce(),
"sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
return err
}
} else {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (w *Watcher) startRangeFeed(
// at server startup.
var tableID descpb.ID
err := startup.RunIdempotentWithRetry(ctx,
w.stopper.ShouldQuiesce(),
"tenant start setting rangefeed",
func(ctx context.Context) (err error) {
tableID, err = sysTableResolver.LookupSystemTableID(ctx,
Expand Down
16 changes: 9 additions & 7 deletions pkg/startupmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func ExpectedDescriptorIDs(
defaultZoneConfig *zonepb.ZoneConfig,
defaultSystemZoneConfig *zonepb.ZoneConfig,
) (descpb.IDs, error) {
completedMigrations, err := getCompletedMigrations(ctx, db, codec)
completedMigrations, err := getCompletedMigrations(ctx, make(chan struct{}), db, codec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -512,7 +512,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
defer m.testingKnobs.AfterEnsureMigrations()
}
// First, check whether there are any migrations that need to be run.
completedMigrations, err := getCompletedMigrations(ctx, m.db, m.codec)
completedMigrations, err := getCompletedMigrations(ctx, m.stopper.ShouldQuiesce(), m.db, m.codec)
if err != nil {
return err
}
Expand Down Expand Up @@ -599,7 +599,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb

// Re-get the list of migrations in case any of them were completed between
// our initial check and our grabbing of the lease.
completedMigrations, err = getCompletedMigrations(ctx, m.db, m.codec)
completedMigrations, err = getCompletedMigrations(ctx, m.stopper.ShouldQuiesce(), m.db, m.codec)
if err != nil {
return err
}
Expand Down Expand Up @@ -630,15 +630,15 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb
if log.V(1) {
log.Infof(ctx, "running migration %q", migration.name)
}
if err := startup.RunIdempotentWithRetry(ctx, migration.name,
if err := startup.RunIdempotentWithRetry(ctx, m.stopper.ShouldQuiesce(), migration.name,
func(ctx context.Context) error {
return migration.workFn(ctx, r)
}); err != nil {
return errors.Wrapf(err, "failed to run migration %q", migration.name)
}

log.VEventf(ctx, 1, "persisting record of completing migration %s", migration.name)
if err := startup.RunIdempotentWithRetry(ctx,
if err := startup.RunIdempotentWithRetry(ctx, m.stopper.ShouldQuiesce(),
"persist completed migration record",
func(ctx context.Context) error { return m.db.Put(ctx, key, startTime) }); err != nil {
return errors.Wrapf(err, "failed to persist record of completing migration %q",
Expand Down Expand Up @@ -674,14 +674,16 @@ func (m *Manager) shouldRunMigration(
// this in other places are unlikely, but care must be taken in case some fixes
// are backported.
func getCompletedMigrations(
ctx context.Context, db DB, codec keys.SQLCodec,
ctx context.Context, quiesce <-chan struct{}, db DB, codec keys.SQLCodec,
) (map[string]struct{}, error) {
if log.V(1) {
log.Info(ctx, "trying to get the list of completed migrations")
}
prefix := codec.MigrationKeyPrefix()
var keyvals []kv.KeyValue
err := startup.RunIdempotentWithRetry(ctx,"get completed migrations",
err := startup.RunIdempotentWithRetry(ctx,
quiesce,
"get completed migrations",
func(ctx context.Context) (err error) {
keyvals, err = db.Scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */)
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/startup/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ func WithoutChecks(ctx context.Context) context.Context {
// retry should be performed explicitly while using WithoutChecks context
// to suppress safety mechanisms.
func RunIdempotentWithRetry(
ctx context.Context, opName string, f func(ctx context.Context) error,
ctx context.Context, quiesce <-chan struct{}, opName string, f func(ctx context.Context) error,
) error {
ctx = context.WithValue(ctx, startupRetryKey{}, "in retry")
every := log.Every(5 * time.Second)
// Retry failures indefinitely until context is cancelled.
var err error
for r := retry.StartWithCtx(ctx, startupRetryOpts); r.Next(); {
retryOpts := startupRetryOpts
retryOpts.Closer = quiesce
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
err = f(ctx)
if err == nil {
break
Expand Down

0 comments on commit 3a217ee

Please sign in to comment.