Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: retry ambiguous kv/sql operations on node startup #97710

Merged
merged 2 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ ALL_TESTS = [
"//pkg/util/shuffle:shuffle_test",
"//pkg/util/slidingwindow:slidingwindow_test",
"//pkg/util/span:span_test",
"//pkg/util/startup:startup_test",
"//pkg/util/stop:stop_test",
"//pkg/util/stringarena:stringarena_test",
"//pkg/util/strutil:strutil_test",
Expand Down Expand Up @@ -2237,6 +2238,8 @@ GO_TARGETS = [
"//pkg/util/sort:sort",
"//pkg/util/span:span",
"//pkg/util/span:span_test",
"//pkg/util/startup:startup",
"//pkg/util/startup:startup_test",
"//pkg/util/stop:stop",
"//pkg/util/stop:stop_test",
"//pkg/util/stringarena:stringarena",
Expand Down Expand Up @@ -3307,6 +3310,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/slidingwindow:get_x_data",
"//pkg/util/sort:get_x_data",
"//pkg/util/span:get_x_data",
"//pkg/util/startup:get_x_data",
"//pkg/util/stop:get_x_data",
"//pkg/util/stringarena:get_x_data",
"//pkg/util/stringencoding:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyvisualizer/keyvissubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
Expand All @@ -23,6 +24,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/startup",
"//pkg/util/stop",
],
)
Expand Down
10 changes: 7 additions & 3 deletions pkg/keyvisualizer/keyvissubscriber/boundary_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

Expand All @@ -49,9 +51,11 @@ func Start(
handleBoundaryUpdate func(update *keyvispb.UpdateBoundariesRequest),
) error {

tableID, err := sysTableResolver.LookupSystemTableID(
ctx, systemschema.SpanStatsTenantBoundariesTable.GetName())

tableID, err := startup.RunIdempotentWithRetryEx(ctx, "obs lookup system table",
func(ctx context.Context) (descpb.ID, error) {
return sysTableResolver.LookupSystemTableID(
ctx, systemschema.SpanStatsTenantBoundariesTable.GetName())
})
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -802,6 +803,8 @@ func unsetCanForwardReadTimestampFlag(ba *kvpb.BatchRequest) {
func (ds *DistSender) Send(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
startup.AssertStartupRetry(ctx)

ds.incrementBatchCounters(ba)

if pErr := ds.initAndVerifyBatch(ctx, ba); pErr != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ go_library(
"//pkg/util/rangedesc",
"//pkg/util/retry",
"//pkg/util/schedulerlatency",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/strutil",
"//pkg/util/syncutil",
Expand Down Expand Up @@ -421,6 +422,7 @@ go_test(
"server_http_test.go",
"server_import_ts_test.go",
"server_internal_executor_factory_test.go",
"server_startup_test.go",
"server_systemlog_gc_test.go",
"server_test.go",
"settings_cache_test.go",
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/import_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
yaml "gopkg.in/yaml.v2"
Expand All @@ -38,6 +39,10 @@ import (
const maxBatchSize = 10000

func maybeImportTS(ctx context.Context, s *Server) (returnErr error) {
// We don't want to do startup retries as this is not meant to be run in
// production.
ctx = startup.WithoutChecks(ctx)

var deferError func(error)
{
var defErr error
Expand Down
55 changes: 30 additions & 25 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1009,35 +1010,39 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error {
// Immediately record summaries once on server startup. The update loop below
// will only update the key if it exists, to avoid race conditions during
// node decommissioning, so we have to error out if we can't create it.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
"kv write node status", func(ctx context.Context) error {
return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */)
}); err != nil {
return errors.Wrap(err, "error recording initial status summaries")
}
return n.stopper.RunAsyncTask(ctx, "write-node-status", func(ctx context.Context) {
// Write a status summary immediately; this helps the UI remain
// responsive when new nodes are added.
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Use an alertTTL of twice the ticker frequency. This makes sure that
// alerts don't disappear and reappear spuriously while at the same
// time ensuring that an alert doesn't linger for too long after having
// resolved.
//
// The status key must already exist, to avoid race conditions
// during decommissioning of this node. Decommissioning may be
// carried out by a different node, so this avoids resurrecting
// the status entry after the decommissioner has removed it.
// See Server.Decommission().
if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil {
log.Warningf(ctx, "error recording status summaries: %s", err)
return n.stopper.RunAsyncTask(ctx, "write-node-status",
func(ctx context.Context) {
// Write a status summary immediately; this helps the UI remain
// responsive when new nodes are added.
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Use an alertTTL of twice the ticker frequency. This makes sure that
// alerts don't disappear and reappear spuriously while at the same
// time ensuring that an alert doesn't linger for too long after having
// resolved.
//
// The status key must already exist, to avoid race conditions
// during decommissioning of this node. Decommissioning may be
// carried out by a different node, so this avoids resurrecting
// the status entry after the decommissioner has removed it.
// See Server.Decommission().
if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil {
log.Warningf(ctx, "error recording status summaries: %s", err)
}
case <-n.stopper.ShouldQuiesce():
return
}
case <-n.stopper.ShouldQuiesce():
return
}
}
})
})
}

// writeNodeStatus retrieves status summaries from the supplied
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/schedulerlatency"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil/ptp"
Expand Down Expand Up @@ -1285,6 +1286,8 @@ func (li listenerInfo) Iter() map[string]string {
// should represent the general startup operation.
func (s *Server) PreStart(ctx context.Context) error {
ctx = s.AnnotateCtx(ctx)
done := startup.Begin(ctx)
defer done()

// The following initialization is mirrored in
// (*SQLServerWrapper).PreStart. Please keep them in sync.
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -111,7 +112,10 @@ func (c *serverController) startInitialSecondaryTenantServers(
ctx context.Context, ie isql.Executor,
) error {
// The list of tenants that should have a running server.
reqTenants, err := c.getExpectedRunningTenants(ctx, ie)
reqTenants, err := startup.RunIdempotentWithRetryEx(ctx, "get expected running tenants",
func(ctx context.Context) ([]roachpb.TenantName, error) {
return c.getExpectedRunningTenants(ctx, ie)
})
if err != nil {
return err
}
Expand Down
86 changes: 48 additions & 38 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1422,10 +1423,17 @@ func (s *SQLServer) preStart(
// Load the multi-region enum by reading the system database's descriptor.
// This also serves as a simple check to see if a tenant exist (i.e. by
// checking whether the system db has been bootstrapped).
regionPhysicalRep, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, s.internalDB, keys.SystemDatabaseID, s.distSQLServer.Locality,
)
if err != nil && !errors.Is(err, sql.ErrNotMultiRegionDatabase) {
regionPhysicalRep, err := startup.RunIdempotentWithRetryEx(ctx, "sql get locality",
func(ctx context.Context) ([]byte, error) {
res, err := sql.GetLocalityRegionEnumPhysicalRepresentation(
ctx, s.internalDB, keys.SystemDatabaseID, s.distSQLServer.Locality,
)
if errors.Is(err, sql.ErrNotMultiRegionDatabase) {
err = nil
}
return res, err
})
if err != nil {
return err
}

Expand Down Expand Up @@ -1453,35 +1461,33 @@ func (s *SQLServer) preStart(
// ID. Otherwise, allow our SQL instance ID to be generated by
// SQL.
nodeID, hasNodeID := s.sqlIDContainer.OptionalNodeID()
var instance sqlinstance.InstanceInfo
if hasNodeID {
// Write/acquire our instance row.
instance, err = s.sqlInstanceStorage.CreateNodeInstance(
ctx,
session.ID(),
session.Expiration(),
s.cfg.AdvertiseAddr,
s.cfg.SQLAdvertiseAddr,
s.distSQLServer.Locality,
s.execCfg.Settings.Version.BinaryVersion(),
nodeID,
)
if err != nil {
return err
}
} else {
instance, err = s.sqlInstanceStorage.CreateInstance(
ctx,
session.ID(),
session.Expiration(),
s.cfg.AdvertiseAddr,
s.cfg.SQLAdvertiseAddr,
s.distSQLServer.Locality,
s.execCfg.Settings.Version.BinaryVersion(),
)
if err != nil {
return err
}
instance, err := startup.RunIdempotentWithRetryEx(ctx, "sql create node instance row",
func(ctx context.Context) (sqlinstance.InstanceInfo, error) {
if hasNodeID {
// Write/acquire our instance row.
return s.sqlInstanceStorage.CreateNodeInstance(
ctx,
session.ID(),
session.Expiration(),
s.cfg.AdvertiseAddr,
s.cfg.SQLAdvertiseAddr,
s.distSQLServer.Locality,
s.execCfg.Settings.Version.BinaryVersion(),
nodeID,
)
}
return s.sqlInstanceStorage.CreateInstance(
ctx,
session.ID(),
session.Expiration(),
s.cfg.AdvertiseAddr,
s.cfg.SQLAdvertiseAddr,
s.distSQLServer.Locality,
s.execCfg.Settings.Version.BinaryVersion(),
)
})
if err != nil {
return err
}

// TODO(andrei): Release the instance ID on server shutdown. It is not trivial
Expand Down Expand Up @@ -1544,8 +1550,10 @@ func (s *SQLServer) preStart(

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
if err := startup.RunIdempotentWithRetry(ctx, "sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
return err
}
Expand Down Expand Up @@ -1601,9 +1609,11 @@ func (s *SQLServer) preStart(
// "system.settings" table of this tenant. This includes both system
// and secondary tenants.
var tenantActiveVersion clusterversion.ClusterVersion
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn)
return err
if err := startup.RunIdempotentWithRetry(ctx, "sql get tenant version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
tenantActiveVersion, err = s.settingsWatcher.GetClusterVersionFromStorage(ctx, txn)
return err
})
}); err != nil {
return err
}
Expand Down
Loading