Skip to content

Commit

Permalink
server,util: avoid retry forever during startup upon premature shutdown
Browse files Browse the repository at this point in the history
We need to have the `RunIdempotentWithRetry` calls abort if the
surrounding server shuts down prematurely. This happens most
frequently in tests that fail for another reason; and also
in multitenancy tests with multiple tenant server side-by-side.

Release note: None
  • Loading branch information
knz committed Apr 4, 2023
1 parent a8f0efe commit f849ef6
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 37 deletions.
3 changes: 2 additions & 1 deletion pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func Start(
handleBoundaryUpdate func(update *keyvispb.UpdateBoundariesRequest),
) error {

tableID, err := startup.RunIdempotentWithRetryEx(ctx, "obs lookup system table",
tableID, err := startup.RunIdempotentWithRetryEx(ctx, stopper.ShouldQuiesce(),
"obs lookup system table",
func(ctx context.Context) (descpb.ID, error) {
return sysTableResolver.LookupSystemTableID(
ctx, systemschema.SpanStatsTenantBoundariesTable.GetName())
Expand Down
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error {
// will only update the key if it exists, to avoid race conditions during
// node decommissioning, so we have to error out if we can't create it.
if err := startup.RunIdempotentWithRetry(ctx,
n.stopper.ShouldQuiesce(),
"kv write node status", func(ctx context.Context) error {
return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */)
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ func (c *serverController) startInitialSecondaryTenantServers(
ctx context.Context, ie isql.Executor,
) error {
// The list of tenants that should have a running server.
reqTenants, err := startup.RunIdempotentWithRetryEx(ctx, "get expected running tenants",
reqTenants, err := startup.RunIdempotentWithRetryEx(ctx,
c.stopper.ShouldQuiesce(),
"get expected running tenants",
func(ctx context.Context) ([]roachpb.TenantName, error) {
return c.getExpectedRunningTenants(ctx, ie)
})
Expand Down
34 changes: 21 additions & 13 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,9 @@ func (s *SQLServer) preStart(
// Load the multi-region enum by reading the system database's descriptor.
// This also serves as a simple check to see if a tenant exist (i.e. by
// checking whether the system db has been bootstrapped).
regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx, "sql get locality",
regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx,
stopper.ShouldQuiesce(),
"sql get locality",
func(ctx context.Context) ([]byte, error) {
res, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, s.internalDB, keys.SystemDatabaseID, s.distSQLServer.Locality,
Expand Down Expand Up @@ -1465,7 +1467,9 @@ func (s *SQLServer) preStart(
// ID. Otherwise, allow our SQL instance ID to be generated by
// SQL.
nodeID, hasNodeID := s.sqlIDContainer.OptionalNodeID()
instance, err := startup.RunIdempotentWithRetryEx(ctx, "sql create node instance row",
instance, err := startup.RunIdempotentWithRetryEx(ctx,
stopper.ShouldQuiesce(),
"sql create node instance row",
func(ctx context.Context) (sqlinstance.InstanceInfo, error) {
if hasNodeID {
// Write/acquire our instance row.
Expand Down Expand Up @@ -1554,11 +1558,13 @@ func (s *SQLServer) preStart(

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := startup.RunIdempotentWithRetry(ctx, "sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
s.stopper.ShouldQuiesce(),
"sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -1613,12 +1619,14 @@ func (s *SQLServer) preStart(
// "system.settings" table of this tenant. This includes both system
// and secondary tenants.
var tenantActiveVersion clusterversion.ClusterVersion
if err := startup.RunIdempotentWithRetry(ctx, "sql get tenant version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn)
return err
})
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
s.stopper.ShouldQuiesce(),
"sql get tenant version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn)
return err
})
}); err != nil {
return err
}
if s.execCfg.Settings.Version.BinaryVersion().Less(tenantActiveVersion.Version) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (w *Watcher) startRangeFeed(
) error {
// We need to retry unavailable replicas here. This is only meant to be called
// at server startup.
tableID, err := startup.RunIdempotentWithRetryEx(ctx, "tenant start setting rangefeed",
tableID, err := startup.RunIdempotentWithRetryEx(ctx,
w.stopper.ShouldQuiesce(),
"tenant start setting rangefeed",
func(ctx context.Context) (descpb.ID, error) {
return sysTableResolver.LookupSystemTableID(ctx, systemschema.TenantSettingsTable.GetName())
})
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgrademanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/upgrade/upgrades",
"//pkg/util/log",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
49 changes: 31 additions & 18 deletions pkg/upgrade/upgrademanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -212,7 +213,9 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
latest := permanentUpgrades[len(permanentUpgrades)-1]
lastVer := latest.Version()
enterpriseEnabled := base.CCLDistributionAndEnterpriseEnabled(m.settings, m.clusterID)
lastUpgradeCompleted, err := startup.RunIdempotentWithRetryEx(ctx, "check if migration completed",
lastUpgradeCompleted, err := startup.RunIdempotentWithRetryEx(ctx,
m.deps.Stopper.ShouldQuiesce(),
"check if migration completed",
func(ctx context.Context) (bool, error) {
return migrationstable.CheckIfMigrationCompleted(
ctx, lastVer, nil /* txn */, m.ie,
Expand Down Expand Up @@ -256,7 +259,7 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
//
// TODO(andrei): Get rid of this once compatibility with 22.2 is not necessary.
startupMigrationAlreadyRan, err := checkOldStartupMigrationRan(
ctx, u.V22_2StartupMigrationName(), m.deps.DB.KV(), m.codec)
ctx, m.deps.Stopper, u.V22_2StartupMigrationName(), m.deps.DB.KV(), m.codec)
if err != nil {
return err
}
Expand All @@ -267,9 +270,11 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
u.Version())
// Mark the upgrade as completed so that we can get rid of this logic when
// compatibility with 22.2 is no longer necessary.
if err := startup.RunIdempotentWithRetry(ctx, "mark upgrade complete", func(ctx context.Context) (err error) {
return migrationstable.MarkMigrationCompletedIdempotent(ctx, m.ie, u.Version())
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"mark upgrade complete", func(ctx context.Context) (err error) {
return migrationstable.MarkMigrationCompletedIdempotent(ctx, m.ie, u.Version())
}); err != nil {
return err
}
continue
Expand All @@ -287,13 +292,15 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
// old startupmigration with the given name has run. If it did, the
// corresponding upgrade should not run.
func checkOldStartupMigrationRan(
ctx context.Context, migrationName string, db *kv.DB, codec keys.SQLCodec,
ctx context.Context, stopper *stop.Stopper, migrationName string, db *kv.DB, codec keys.SQLCodec,
) (bool, error) {
if migrationName == "" {
return false, nil
}
migrationKey := append(codec.StartupMigrationKeyPrefix(), roachpb.RKey(migrationName)...)
kv, err := startup.RunIdempotentWithRetryEx(ctx, "check old startup migration",
kv, err := startup.RunIdempotentWithRetryEx(ctx,
stopper.ShouldQuiesce(),
"check old startup migration",
func(ctx context.Context) (kv kv.KeyValue, err error) {
return db.Get(ctx, migrationKey)
})
Expand Down Expand Up @@ -715,23 +722,29 @@ func (m *Manager) runMigration(
alreadyCompleted, alreadyExisting bool
id jobspb.JobID
)
if err := startup.RunIdempotentWithRetry(ctx, "upgrade create job", func(ctx context.Context) (err error) {
alreadyCompleted, alreadyExisting, id, err = m.getOrCreateMigrationJob(ctx, user, version,
mig.Name())
return err
}); alreadyCompleted || err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"upgrade create job", func(ctx context.Context) (err error) {
alreadyCompleted, alreadyExisting, id, err = m.getOrCreateMigrationJob(ctx, user, version,
mig.Name())
return err
}); alreadyCompleted || err != nil {
return err
}
if alreadyExisting {
log.Infof(ctx, "waiting for %s", mig.Name())
return startup.RunIdempotentWithRetry(ctx, "upgrade wait jobs", func(ctx context.Context) error {
return m.jr.WaitForJobs(ctx, []jobspb.JobID{id})
})
return startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"upgrade wait jobs", func(ctx context.Context) error {
return m.jr.WaitForJobs(ctx, []jobspb.JobID{id})
})
} else {
log.Infof(ctx, "running %s", mig.Name())
return startup.RunIdempotentWithRetry(ctx, "upgrade run jobs", func(ctx context.Context) error {
return m.jr.Run(ctx, []jobspb.JobID{id})
})
return startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"upgrade run jobs", func(ctx context.Context) error {
return m.jr.Run(ctx, []jobspb.JobID{id})
})
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/util/startup/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func WithoutChecks(ctx context.Context) context.Context {
// retry should be performed explicitly while using WithoutChecks context
// to suppress safety mechanisms.
func RunIdempotentWithRetry(
ctx context.Context, opName string, f func(ctx context.Context) error,
ctx context.Context, quiesce <-chan struct{}, opName string, f func(ctx context.Context) error,
) error {
_, err := RunIdempotentWithRetryEx(ctx, opName, func(ctx context.Context) (any, error) {
_, err := RunIdempotentWithRetryEx(ctx, quiesce, opName, func(ctx context.Context) (any, error) {
return nil, f(ctx)
})
return err
Expand All @@ -124,13 +124,18 @@ func RunIdempotentWithRetry(
// RunIdempotentWithRetryEx run function returning value with startup retry.
// See RunIdempotentWithRetry for important details.
func RunIdempotentWithRetryEx[T any](
ctx context.Context, opName string, f func(ctx context.Context) (T, error),
ctx context.Context,
quiesce <-chan struct{},
opName string,
f func(ctx context.Context) (T, error),
) (T, error) {
ctx = context.WithValue(ctx, startupRetryKey{}, "in retry")
every := log.Every(5 * time.Second)
// Retry failures indefinitely until context is cancelled.
var result T
var err error
retryOpts := startupRetryOpts
retryOpts.Closer = quiesce
for r := retry.StartWithCtx(ctx, startupRetryOpts); r.Next(); {
result, err = f(ctx)
if err == nil {
Expand Down

0 comments on commit f849ef6

Please sign in to comment.