From f849ef6368c89ccb09beaaa790c5fb6aab844fb8 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Tue, 4 Apr 2023 14:26:35 +0200 Subject: [PATCH] server,util: avoid retry forever during startup upon premature shutdown We need to have the `RunIdempotentWithRetry` calls abort if the surrounding server shuts down prematurely. This happens most frequently in tests that fail for another reason; and also in multitenancy tests with multiple tenant server side-by-side. Release note: None --- .../keyvissubscriber/boundary_subscriber.go | 3 +- pkg/server/node.go | 1 + pkg/server/server_controller_orchestration.go | 4 +- pkg/server/server_sql.go | 34 ++++++++----- pkg/server/tenantsettingswatcher/watcher.go | 4 +- pkg/upgrade/upgrademanager/BUILD.bazel | 1 + pkg/upgrade/upgrademanager/manager.go | 49 ++++++++++++------- pkg/util/startup/retry.go | 11 +++-- 8 files changed, 70 insertions(+), 37 deletions(-) diff --git a/pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go b/pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go index 1a17f5678db8..0d2429d8bb53 100644 --- a/pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go +++ b/pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go @@ -51,7 +51,8 @@ func Start( handleBoundaryUpdate func(update *keyvispb.UpdateBoundariesRequest), ) error { - tableID, err := startup.RunIdempotentWithRetryEx(ctx, "obs lookup system table", + tableID, err := startup.RunIdempotentWithRetryEx(ctx, stopper.ShouldQuiesce(), + "obs lookup system table", func(ctx context.Context) (descpb.ID, error) { return sysTableResolver.LookupSystemTableID( ctx, systemschema.SpanStatsTenantBoundariesTable.GetName()) diff --git a/pkg/server/node.go b/pkg/server/node.go index 2f490e3424b9..39166ff8538a 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1011,6 +1011,7 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error { // will only update the key if it exists, to avoid race conditions during // node decommissioning, so we have to error out if we can't create it. if err := startup.RunIdempotentWithRetry(ctx, + n.stopper.ShouldQuiesce(), "kv write node status", func(ctx context.Context) error { return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */) }); err != nil { diff --git a/pkg/server/server_controller_orchestration.go b/pkg/server/server_controller_orchestration.go index 3abb03588bbd..a5459a853502 100644 --- a/pkg/server/server_controller_orchestration.go +++ b/pkg/server/server_controller_orchestration.go @@ -112,7 +112,9 @@ func (c *serverController) startInitialSecondaryTenantServers( ctx context.Context, ie isql.Executor, ) error { // The list of tenants that should have a running server. - reqTenants, err := startup.RunIdempotentWithRetryEx(ctx, "get expected running tenants", + reqTenants, err := startup.RunIdempotentWithRetryEx(ctx, + c.stopper.ShouldQuiesce(), + "get expected running tenants", func(ctx context.Context) ([]roachpb.TenantName, error) { return c.getExpectedRunningTenants(ctx, ie) }) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 16652914a2c1..2fb61cdc6bd2 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1427,7 +1427,9 @@ func (s *SQLServer) preStart( // Load the multi-region enum by reading the system database's descriptor. // This also serves as a simple check to see if a tenant exist (i.e. by // checking whether the system db has been bootstrapped). - regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx, "sql get locality", + regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx, + stopper.ShouldQuiesce(), + "sql get locality", func(ctx context.Context) ([]byte, error) { res, err := sql.GetLocalityRegionEnumPhysicalRepresentation( ctx, s.internalDB, keys.SystemDatabaseID, s.distSQLServer.Locality, @@ -1465,7 +1467,9 @@ func (s *SQLServer) preStart( // ID. Otherwise, allow our SQL instance ID to be generated by // SQL. nodeID, hasNodeID := s.sqlIDContainer.OptionalNodeID() - instance, err := startup.RunIdempotentWithRetryEx(ctx, "sql create node instance row", + instance, err := startup.RunIdempotentWithRetryEx(ctx, + stopper.ShouldQuiesce(), + "sql create node instance row", func(ctx context.Context) (sqlinstance.InstanceInfo, error) { if hasNodeID { // Write/acquire our instance row. @@ -1554,11 +1558,13 @@ func (s *SQLServer) preStart( var bootstrapVersion roachpb.Version if s.execCfg.Codec.ForSystemTenant() { - if err := startup.RunIdempotentWithRetry(ctx, "sql get cluster version", func(ctx context.Context) error { - return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion) - }) - }); err != nil { + if err := startup.RunIdempotentWithRetry(ctx, + s.stopper.ShouldQuiesce(), + "sql get cluster version", func(ctx context.Context) error { + return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion) + }) + }); err != nil { return err } } else { @@ -1613,12 +1619,14 @@ func (s *SQLServer) preStart( // "system.settings" table of this tenant. This includes both system // and secondary tenants. var tenantActiveVersion clusterversion.ClusterVersion - if err := startup.RunIdempotentWithRetry(ctx, "sql get tenant version", func(ctx context.Context) error { - return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn) - return err - }) - }); err != nil { + if err := startup.RunIdempotentWithRetry(ctx, + s.stopper.ShouldQuiesce(), + "sql get tenant version", func(ctx context.Context) error { + return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn) + return err + }) + }); err != nil { return err } if s.execCfg.Settings.Version.BinaryVersion().Less(tenantActiveVersion.Version) { diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index d0b4d4d15cc8..5faf8ab30cfd 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -98,7 +98,9 @@ func (w *Watcher) startRangeFeed( ) error { // We need to retry unavailable replicas here. This is only meant to be called // at server startup. - tableID, err := startup.RunIdempotentWithRetryEx(ctx, "tenant start setting rangefeed", + tableID, err := startup.RunIdempotentWithRetryEx(ctx, + w.stopper.ShouldQuiesce(), + "tenant start setting rangefeed", func(ctx context.Context) (descpb.ID, error) { return sysTableResolver.LookupSystemTableID(ctx, systemschema.TenantSettingsTable.GetName()) }) diff --git a/pkg/upgrade/upgrademanager/BUILD.bazel b/pkg/upgrade/upgrademanager/BUILD.bazel index 99e0f0db4852..3d1bb2db6f65 100644 --- a/pkg/upgrade/upgrademanager/BUILD.bazel +++ b/pkg/upgrade/upgrademanager/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//pkg/upgrade/upgrades", "//pkg/util/log", "//pkg/util/startup", + "//pkg/util/stop", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/upgrade/upgrademanager/manager.go b/pkg/upgrade/upgrademanager/manager.go index 0300743af4dd..4cb4f3d07524 100644 --- a/pkg/upgrade/upgrademanager/manager.go +++ b/pkg/upgrade/upgrademanager/manager.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/startup" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" @@ -212,7 +213,9 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb. latest := permanentUpgrades[len(permanentUpgrades)-1] lastVer := latest.Version() enterpriseEnabled := base.CCLDistributionAndEnterpriseEnabled(m.settings, m.clusterID) - lastUpgradeCompleted, err := startup.RunIdempotentWithRetryEx(ctx, "check if migration completed", + lastUpgradeCompleted, err := startup.RunIdempotentWithRetryEx(ctx, + m.deps.Stopper.ShouldQuiesce(), + "check if migration completed", func(ctx context.Context) (bool, error) { return migrationstable.CheckIfMigrationCompleted( ctx, lastVer, nil /* txn */, m.ie, @@ -256,7 +259,7 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb. // // TODO(andrei): Get rid of this once compatibility with 22.2 is not necessary. startupMigrationAlreadyRan, err := checkOldStartupMigrationRan( - ctx, u.V22_2StartupMigrationName(), m.deps.DB.KV(), m.codec) + ctx, m.deps.Stopper, u.V22_2StartupMigrationName(), m.deps.DB.KV(), m.codec) if err != nil { return err } @@ -267,9 +270,11 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb. u.Version()) // Mark the upgrade as completed so that we can get rid of this logic when // compatibility with 22.2 is no longer necessary. - if err := startup.RunIdempotentWithRetry(ctx, "mark upgrade complete", func(ctx context.Context) (err error) { - return migrationstable.MarkMigrationCompletedIdempotent(ctx, m.ie, u.Version()) - }); err != nil { + if err := startup.RunIdempotentWithRetry(ctx, + m.deps.Stopper.ShouldQuiesce(), + "mark upgrade complete", func(ctx context.Context) (err error) { + return migrationstable.MarkMigrationCompletedIdempotent(ctx, m.ie, u.Version()) + }); err != nil { return err } continue @@ -287,13 +292,15 @@ func (m *Manager) RunPermanentUpgrades(ctx context.Context, upToVersion roachpb. // old startupmigration with the given name has run. If it did, the // corresponding upgrade should not run. func checkOldStartupMigrationRan( - ctx context.Context, migrationName string, db *kv.DB, codec keys.SQLCodec, + ctx context.Context, stopper *stop.Stopper, migrationName string, db *kv.DB, codec keys.SQLCodec, ) (bool, error) { if migrationName == "" { return false, nil } migrationKey := append(codec.StartupMigrationKeyPrefix(), roachpb.RKey(migrationName)...) - kv, err := startup.RunIdempotentWithRetryEx(ctx, "check old startup migration", + kv, err := startup.RunIdempotentWithRetryEx(ctx, + stopper.ShouldQuiesce(), + "check old startup migration", func(ctx context.Context) (kv kv.KeyValue, err error) { return db.Get(ctx, migrationKey) }) @@ -715,23 +722,29 @@ func (m *Manager) runMigration( alreadyCompleted, alreadyExisting bool id jobspb.JobID ) - if err := startup.RunIdempotentWithRetry(ctx, "upgrade create job", func(ctx context.Context) (err error) { - alreadyCompleted, alreadyExisting, id, err = m.getOrCreateMigrationJob(ctx, user, version, - mig.Name()) - return err - }); alreadyCompleted || err != nil { + if err := startup.RunIdempotentWithRetry(ctx, + m.deps.Stopper.ShouldQuiesce(), + "upgrade create job", func(ctx context.Context) (err error) { + alreadyCompleted, alreadyExisting, id, err = m.getOrCreateMigrationJob(ctx, user, version, + mig.Name()) + return err + }); alreadyCompleted || err != nil { return err } if alreadyExisting { log.Infof(ctx, "waiting for %s", mig.Name()) - return startup.RunIdempotentWithRetry(ctx, "upgrade wait jobs", func(ctx context.Context) error { - return m.jr.WaitForJobs(ctx, []jobspb.JobID{id}) - }) + return startup.RunIdempotentWithRetry(ctx, + m.deps.Stopper.ShouldQuiesce(), + "upgrade wait jobs", func(ctx context.Context) error { + return m.jr.WaitForJobs(ctx, []jobspb.JobID{id}) + }) } else { log.Infof(ctx, "running %s", mig.Name()) - return startup.RunIdempotentWithRetry(ctx, "upgrade run jobs", func(ctx context.Context) error { - return m.jr.Run(ctx, []jobspb.JobID{id}) - }) + return startup.RunIdempotentWithRetry(ctx, + m.deps.Stopper.ShouldQuiesce(), + "upgrade run jobs", func(ctx context.Context) error { + return m.jr.Run(ctx, []jobspb.JobID{id}) + }) } } } diff --git a/pkg/util/startup/retry.go b/pkg/util/startup/retry.go index 636853aff935..97faabab20fd 100644 --- a/pkg/util/startup/retry.go +++ b/pkg/util/startup/retry.go @@ -113,9 +113,9 @@ func WithoutChecks(ctx context.Context) context.Context { // retry should be performed explicitly while using WithoutChecks context // to suppress safety mechanisms. func RunIdempotentWithRetry( - ctx context.Context, opName string, f func(ctx context.Context) error, + ctx context.Context, quiesce <-chan struct{}, opName string, f func(ctx context.Context) error, ) error { - _, err := RunIdempotentWithRetryEx(ctx, opName, func(ctx context.Context) (any, error) { + _, err := RunIdempotentWithRetryEx(ctx, quiesce, opName, func(ctx context.Context) (any, error) { return nil, f(ctx) }) return err @@ -124,13 +124,18 @@ func RunIdempotentWithRetry( // RunIdempotentWithRetryEx run function returning value with startup retry. // See RunIdempotentWithRetry for important details. func RunIdempotentWithRetryEx[T any]( - ctx context.Context, opName string, f func(ctx context.Context) (T, error), + ctx context.Context, + quiesce <-chan struct{}, + opName string, + f func(ctx context.Context) (T, error), ) (T, error) { ctx = context.WithValue(ctx, startupRetryKey{}, "in retry") every := log.Every(5 * time.Second) // Retry failures indefinitely until context is cancelled. var result T var err error + retryOpts := startupRetryOpts + retryOpts.Closer = quiesce for r := retry.StartWithCtx(ctx, startupRetryOpts); r.Next(); { result, err = f(ctx) if err == nil {