Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server,util: avoid retry forever during startup upon premature shutdown #100583

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func Start(
handleBoundaryUpdate func(update *keyvispb.UpdateBoundariesRequest),
) error {

tableID, err := startup.RunIdempotentWithRetryEx(ctx, "obs lookup system table",
tableID, err := startup.RunIdempotentWithRetryEx(ctx, stopper.ShouldQuiesce(),
"obs lookup system table",
func(ctx context.Context) (descpb.ID, error) {
return sysTableResolver.LookupSystemTableID(
ctx, systemschema.SpanStatsTenantBoundariesTable.GetName())
Expand Down
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error {
// will only update the key if it exists, to avoid race conditions during
// node decommissioning, so we have to error out if we can't create it.
if err := startup.RunIdempotentWithRetry(ctx,
n.stopper.ShouldQuiesce(),
"kv write node status", func(ctx context.Context) error {
return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */)
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ func (c *serverController) startInitialSecondaryTenantServers(
ctx context.Context, ie isql.Executor,
) error {
// The list of tenants that should have a running server.
reqTenants, err := startup.RunIdempotentWithRetryEx(ctx, "get expected running tenants",
reqTenants, err := startup.RunIdempotentWithRetryEx(ctx,
c.stopper.ShouldQuiesce(),
"get expected running tenants",
func(ctx context.Context) ([]roachpb.TenantName, error) {
return c.getExpectedRunningTenants(ctx, ie)
})
Expand Down
34 changes: 21 additions & 13 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,9 @@ func (s *SQLServer) preStart(
// Load the multi-region enum by reading the system database's descriptor.
// This also serves as a simple check to see if a tenant exist (i.e. by
// checking whether the system db has been bootstrapped).
regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx, "sql get locality",
regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx,
stopper.ShouldQuiesce(),
"sql get locality",
func(ctx context.Context) ([]byte, error) {
res, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, s.internalDB, keys.SystemDatabaseID, s.distSQLServer.Locality,
Expand Down Expand Up @@ -1465,7 +1467,9 @@ func (s *SQLServer) preStart(
// ID. Otherwise, allow our SQL instance ID to be generated by
// SQL.
nodeID, hasNodeID := s.sqlIDContainer.OptionalNodeID()
instance, err := startup.RunIdempotentWithRetryEx(ctx, "sql create node instance row",
instance, err := startup.RunIdempotentWithRetryEx(ctx,
stopper.ShouldQuiesce(),
"sql create node instance row",
func(ctx context.Context) (sqlinstance.InstanceInfo, error) {
if hasNodeID {
// Write/acquire our instance row.
Expand Down Expand Up @@ -1554,11 +1558,13 @@ func (s *SQLServer) preStart(

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := startup.RunIdempotentWithRetry(ctx, "sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
s.stopper.ShouldQuiesce(),
"sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -1613,12 +1619,14 @@ func (s *SQLServer) preStart(
// "system.settings" table of this tenant. This includes both system
// and secondary tenants.
var tenantActiveVersion clusterversion.ClusterVersion
if err := startup.RunIdempotentWithRetry(ctx, "sql get tenant version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn)
return err
})
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
s.stopper.ShouldQuiesce(),
"sql get tenant version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn)
return err
})
}); err != nil {
return err
}
if s.execCfg.Settings.Version.BinaryVersion().Less(tenantActiveVersion.Version) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (w *Watcher) startRangeFeed(
) error {
// We need to retry unavailable replicas here. This is only meant to be called
// at server startup.
tableID, err := startup.RunIdempotentWithRetryEx(ctx, "tenant start setting rangefeed",
tableID, err := startup.RunIdempotentWithRetryEx(ctx,
w.stopper.ShouldQuiesce(),
"tenant start setting rangefeed",
func(ctx context.Context) (descpb.ID, error) {
return sysTableResolver.LookupSystemTableID(ctx, systemschema.TenantSettingsTable.GetName())
})
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgrademanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/upgrade/upgrades",
"//pkg/util/log",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
49 changes: 31 additions & 18 deletions pkg/upgrade/upgrademanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -212,7 +213,9 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
latest := permanentUpgrades[len(permanentUpgrades)-1]
lastVer := latest.Version()
enterpriseEnabled := base.CCLDistributionAndEnterpriseEnabled(m.settings, m.clusterID)
lastUpgradeCompleted, err := startup.RunIdempotentWithRetryEx(ctx, "check if migration completed",
lastUpgradeCompleted, err := startup.RunIdempotentWithRetryEx(ctx,
m.deps.Stopper.ShouldQuiesce(),
"check if migration completed",
func(ctx context.Context) (bool, error) {
return migrationstable.CheckIfMigrationCompleted(
ctx, lastVer, nil /* txn */, m.ie,
Expand Down Expand Up @@ -256,7 +259,7 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
//
// TODO(andrei): Get rid of this once compatibility with 22.2 is not necessary.
startupMigrationAlreadyRan, err := checkOldStartupMigrationRan(
ctx, u.V22_2StartupMigrationName(), m.deps.DB.KV(), m.codec)
ctx, m.deps.Stopper, u.V22_2StartupMigrationName(), m.deps.DB.KV(), m.codec)
if err != nil {
return err
}
Expand All @@ -267,9 +270,11 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
u.Version())
// Mark the upgrade as completed so that we can get rid of this logic when
// compatibility with 22.2 is no longer necessary.
if err := startup.RunIdempotentWithRetry(ctx, "mark upgrade complete", func(ctx context.Context) (err error) {
return migrationstable.MarkMigrationCompletedIdempotent(ctx, m.ie, u.Version())
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"mark upgrade complete", func(ctx context.Context) (err error) {
return migrationstable.MarkMigrationCompletedIdempotent(ctx, m.ie, u.Version())
}); err != nil {
return err
}
continue
Expand All @@ -287,13 +292,15 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb.
// old startupmigration with the given name has run. If it did, the
// corresponding upgrade should not run.
func checkOldStartupMigrationRan(
ctx context.Context, migrationName string, db *kv.DB, codec keys.SQLCodec,
ctx context.Context, stopper *stop.Stopper, migrationName string, db *kv.DB, codec keys.SQLCodec,
) (bool, error) {
if migrationName == "" {
return false, nil
}
migrationKey := append(codec.StartupMigrationKeyPrefix(), roachpb.RKey(migrationName)...)
kv, err := startup.RunIdempotentWithRetryEx(ctx, "check old startup migration",
kv, err := startup.RunIdempotentWithRetryEx(ctx,
stopper.ShouldQuiesce(),
"check old startup migration",
func(ctx context.Context) (kv kv.KeyValue, err error) {
return db.Get(ctx, migrationKey)
})
Expand Down Expand Up @@ -715,23 +722,29 @@ func (m *Manager) runMigration(
alreadyCompleted, alreadyExisting bool
id jobspb.JobID
)
if err := startup.RunIdempotentWithRetry(ctx, "upgrade create job", func(ctx context.Context) (err error) {
alreadyCompleted, alreadyExisting, id, err = m.getOrCreateMigrationJob(ctx, user, version,
mig.Name())
return err
}); alreadyCompleted || err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"upgrade create job", func(ctx context.Context) (err error) {
alreadyCompleted, alreadyExisting, id, err = m.getOrCreateMigrationJob(ctx, user, version,
mig.Name())
return err
}); alreadyCompleted || err != nil {
return err
}
if alreadyExisting {
log.Infof(ctx, "waiting for %s", mig.Name())
return startup.RunIdempotentWithRetry(ctx, "upgrade wait jobs", func(ctx context.Context) error {
return m.jr.WaitForJobs(ctx, []jobspb.JobID{id})
})
return startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"upgrade wait jobs", func(ctx context.Context) error {
return m.jr.WaitForJobs(ctx, []jobspb.JobID{id})
})
} else {
log.Infof(ctx, "running %s", mig.Name())
return startup.RunIdempotentWithRetry(ctx, "upgrade run jobs", func(ctx context.Context) error {
return m.jr.Run(ctx, []jobspb.JobID{id})
})
return startup.RunIdempotentWithRetry(ctx,
m.deps.Stopper.ShouldQuiesce(),
"upgrade run jobs", func(ctx context.Context) error {
return m.jr.Run(ctx, []jobspb.JobID{id})
})
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/util/startup/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func WithoutChecks(ctx context.Context) context.Context {
// retry should be performed explicitly while using WithoutChecks context
// to suppress safety mechanisms.
func RunIdempotentWithRetry(
ctx context.Context, opName string, f func(ctx context.Context) error,
ctx context.Context, quiesce <-chan struct{}, opName string, f func(ctx context.Context) error,
) error {
_, err := RunIdempotentWithRetryEx(ctx, opName, func(ctx context.Context) (any, error) {
_, err := RunIdempotentWithRetryEx(ctx, quiesce, opName, func(ctx context.Context) (any, error) {
return nil, f(ctx)
})
return err
Expand All @@ -124,13 +124,18 @@ func RunIdempotentWithRetry(
// RunIdempotentWithRetryEx run function returning value with startup retry.
// See RunIdempotentWithRetry for important details.
func RunIdempotentWithRetryEx[T any](
ctx context.Context, opName string, f func(ctx context.Context) (T, error),
ctx context.Context,
quiesce <-chan struct{},
opName string,
f func(ctx context.Context) (T, error),
) (T, error) {
ctx = context.WithValue(ctx, startupRetryKey{}, "in retry")
every := log.Every(5 * time.Second)
// Retry failures indefinitely until context is cancelled.
var result T
var err error
retryOpts := startupRetryOpts
retryOpts.Closer = quiesce
for r := retry.StartWithCtx(ctx, startupRetryOpts); r.Next(); {
result, err = f(ctx)
if err == nil {
Expand Down