diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 494e16cc6d08..8135235ad2dd 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -591,6 +591,7 @@ ALL_TESTS = [ "//pkg/util/shuffle:shuffle_test", "//pkg/util/slidingwindow:slidingwindow_test", "//pkg/util/span:span_test", + "//pkg/util/startup:startup_test", "//pkg/util/stop:stop_test", "//pkg/util/stringarena:stringarena_test", "//pkg/util/strutil:strutil_test", @@ -2034,6 +2035,8 @@ GO_TARGETS = [ "//pkg/util/sort:sort", "//pkg/util/span:span", "//pkg/util/span:span_test", + "//pkg/util/startup:startup", + "//pkg/util/startup:startup_test", "//pkg/util/stop:stop", "//pkg/util/stop:stop_test", "//pkg/util/stringarena:stringarena", @@ -3005,6 +3008,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/slidingwindow:get_x_data", "//pkg/util/sort:get_x_data", "//pkg/util/span:get_x_data", + "//pkg/util/startup:get_x_data", "//pkg/util/stop:get_x_data", "//pkg/util/stringarena:get_x_data", "//pkg/util/stringencoding:get_x_data", diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 9b6a98d57840..18e2cdd23413 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -187,6 +187,13 @@ type TestClusterArgs struct { // A copy of an entry from this map will be copied to each individual server // and potentially adjusted according to ReplicationMode. ServerArgsPerNode map[int]TestServerArgs + + // If reusable listeners is true, then restart should keep listeners untouched + // so that servers are kept on the same ports. It is up to the test to set + // proxy listeners to TestServerArgs.Listener that would survive + // net.Listener.Close() and then allow restarted server to use them again. + // See testutils.ListenerRegistry. + ReusableListeners bool } var ( diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index a944086ffb3e..9d6dc3a210d9 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//pkg/util/quotapool", "//pkg/util/retry", "//pkg/util/shuffle", + "//pkg/util/startup", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 7f154c63d7ea..a79c14629598 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -776,6 +777,8 @@ func unsetCanForwardReadTimestampFlag(ba *roachpb.BatchRequest) { func (ds *DistSender) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { + startup.AssertStartupRetry(ctx) + ds.incrementBatchCounters(&ba) // TODO(nvanbenschoten): This causes ba to escape to the heap. Either diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 8065179a4ac8..291a44d49711 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -258,6 +258,7 @@ go_library( "//pkg/util/quotapool", "//pkg/util/retry", "//pkg/util/schedulerlatency", + "//pkg/util/startup", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", @@ -374,6 +375,7 @@ go_test( "server_http_test.go", "server_import_ts_test.go", "server_internal_executor_factory_test.go", + "server_startup_test.go", "server_systemlog_gc_test.go", "server_test.go", "settings_cache_test.go", diff --git a/pkg/server/import_ts.go b/pkg/server/import_ts.go index 50d31bb29729..29b09adeafa5 100644 --- a/pkg/server/import_ts.go +++ b/pkg/server/import_ts.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" yaml "gopkg.in/yaml.v2" @@ -38,6 +39,10 @@ import ( const maxBatchSize = 10000 func maybeImportTS(ctx context.Context, s *Server) (returnErr error) { + // We don't want to do startup retries as this is not meant to be run in + // production. + ctx = startup.WithoutChecks(ctx) + var deferError func(error) { var defErr error diff --git a/pkg/server/node.go b/pkg/server/node.go index 27b89c5f6e61..af5e8407ebf9 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -56,6 +56,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -940,35 +941,40 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error { // Immediately record summaries once on server startup. The update loop below // will only update the key if it exists, to avoid race conditions during // node decommissioning, so we have to error out if we can't create it. - if err := n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */); err != nil { + if err := startup.RunIdempotentWithRetry(ctx, + n.stopper.ShouldQuiesce(), + "kv write node status", func(ctx context.Context) error { + return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */) + }); err != nil { return errors.Wrap(err, "error recording initial status summaries") } - return n.stopper.RunAsyncTask(ctx, "write-node-status", func(ctx context.Context) { - // Write a status summary immediately; this helps the UI remain - // responsive when new nodes are added. - ticker := time.NewTicker(frequency) - defer ticker.Stop() - for { - select { - case <-ticker.C: - // Use an alertTTL of twice the ticker frequency. This makes sure that - // alerts don't disappear and reappear spuriously while at the same - // time ensuring that an alert doesn't linger for too long after having - // resolved. - // - // The status key must already exist, to avoid race conditions - // during decommissioning of this node. Decommissioning may be - // carried out by a different node, so this avoids resurrecting - // the status entry after the decommissioner has removed it. - // See Server.Decommission(). - if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil { - log.Warningf(ctx, "error recording status summaries: %s", err) + return n.stopper.RunAsyncTask(ctx, "write-node-status", + func(ctx context.Context) { + // Write a status summary immediately; this helps the UI remain + // responsive when new nodes are added. + ticker := time.NewTicker(frequency) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Use an alertTTL of twice the ticker frequency. This makes sure that + // alerts don't disappear and reappear spuriously while at the same + // time ensuring that an alert doesn't linger for too long after having + // resolved. + // + // The status key must already exist, to avoid race conditions + // during decommissioning of this node. Decommissioning may be + // carried out by a different node, so this avoids resurrecting + // the status entry after the decommissioner has removed it. + // See Server.Decommission(). + if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil { + log.Warningf(ctx, "error recording status summaries: %s", err) + } + case <-n.stopper.ShouldQuiesce(): + return } - case <-n.stopper.ShouldQuiesce(): - return } - } - }) + }) } // writeNodeStatus retrieves status summaries from the supplied diff --git a/pkg/server/server.go b/pkg/server/server.go index 153da4e78495..9b34d5ba11e6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -88,6 +88,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/schedulerlatency" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil/ptp" @@ -1042,6 +1043,8 @@ func (s *Server) Start(ctx context.Context) error { // should represent the general startup operation. func (s *Server) PreStart(ctx context.Context) error { ctx = s.AnnotateCtx(ctx) + done := startup.Begin(ctx) + defer done() // Start the time sanity checker. s.startTime = timeutil.Now() diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d8199fa8b2f9..388f484ae5a4 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -112,6 +112,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -1347,9 +1348,13 @@ func (s *SQLServer) preStart( var bootstrapVersion roachpb.Version if s.execCfg.Codec.ForSystemTenant() { - if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion) - }); err != nil { + if err := startup.RunIdempotentWithRetry(ctx, + s.stopper.ShouldQuiesce(), + "sql get cluster version", func(ctx context.Context) error { + return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion) + }) + }); err != nil { return err } } else { diff --git a/pkg/server/server_startup_test.go b/pkg/server/server_startup_test.go new file mode 100644 index 000000000000..d23815cfe3f6 --- /dev/null +++ b/pkg/server/server_startup_test.go @@ -0,0 +1,96 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server_test + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestStartupInjectedFailureSingleNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const failProb = 0.1 + + ctx := context.Background() + + rng, seed := randutil.NewLockedTestRand() + t.Log("TestStartupInjectedFailure random seed", seed) + lReg := testutils.NewListenerRegistry() + defer lReg.Close() + reg := server.NewStickyInMemEnginesRegistry() + defer reg.CloseAllStickyInMemEngines() + + var enableFaults atomic.Bool + args := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: "1", + }, + }, + Listener: lReg.GetOrFail(t, 0), + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: reg, + }, + SpanConfig: &spanconfig.TestingKnobs{ + // Ensure that scratch range has proper zone config, otherwise it is + // anybody's guess and if we chose it test can fail. + ConfigureScratchRange: true, + }, + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, br roachpb.BatchRequest, + ) *roachpb.Error { + if enableFaults.Load() { + if rng.Float32() < failProb { + t.Log("injecting fault into range ", br.RangeID) + return roachpb.NewError(roachpb.NewReplicaUnavailableError(errors.New("injected error"), + &roachpb.RangeDescriptor{RangeID: br.RangeID}, roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(1), + StoreID: roachpb.StoreID(1), + })) + } + } + return nil + }, + }, + }, + }, + ReusableListeners: true, + } + tc := testcluster.NewTestCluster(t, 1, args) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + tc.StopServer(0) + enableFaults.Store(true) + lReg.ReopenOrFail(t, 0) + require.NoError(t, tc.RestartServer(0), "failed to restart server") + + // Disable faults to make it easier for cluster to stop. + enableFaults.Store(false) +} diff --git a/pkg/server/tenantsettingswatcher/BUILD.bazel b/pkg/server/tenantsettingswatcher/BUILD.bazel index f48a798ded8c..0d29d9aa5aca 100644 --- a/pkg/server/tenantsettingswatcher/BUILD.bazel +++ b/pkg/server/tenantsettingswatcher/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/systemschema", "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", @@ -27,6 +28,7 @@ go_library( "//pkg/sql/types", "//pkg/util/hlc", "//pkg/util/log", + "//pkg/util/startup", "//pkg/util/stop", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index 5fa84f73ac00..da1c1f3e46f7 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -20,9 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -93,7 +95,17 @@ func (w *Watcher) Start(ctx context.Context, sysTableResolver catalog.SystemTabl func (w *Watcher) startRangeFeed( ctx context.Context, sysTableResolver catalog.SystemTableIDResolver, ) error { - tableID, err := sysTableResolver.LookupSystemTableID(ctx, systemschema.TenantSettingsTable.GetName()) + // We need to retry unavailable replicas here. This is only meant to be called + // at server startup. + var tableID descpb.ID + err := startup.RunIdempotentWithRetry(ctx, + w.stopper.ShouldQuiesce(), + "tenant start setting rangefeed", + func(ctx context.Context) (err error) { + tableID, err = sysTableResolver.LookupSystemTableID(ctx, + systemschema.TenantSettingsTable.GetName()) + return err + }) if err != nil { return err } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 6f06f94d895b..aec28543856b 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -494,6 +494,7 @@ go_library( "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/ring", + "//pkg/util/startup", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/syncutil/singleflight", diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 001fcec48227..6a69d5b8661e 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -790,6 +791,8 @@ func (ie *InternalExecutor) execInternal( stmt string, qargs ...interface{}, ) (r *rowsIterator, retErr error) { + startup.AssertStartupRetry(ctx) + if err := ie.checkIfTxnIsConsistent(txn); err != nil { return nil, err } diff --git a/pkg/startupmigrations/BUILD.bazel b/pkg/startupmigrations/BUILD.bazel index 8061fda09f73..8f05497d7664 100644 --- a/pkg/startupmigrations/BUILD.bazel +++ b/pkg/startupmigrations/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/retry", + "//pkg/util/startup", "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/startupmigrations/migrations.go b/pkg/startupmigrations/migrations.go index 769724b0c09c..253e4ef726a6 100644 --- a/pkg/startupmigrations/migrations.go +++ b/pkg/startupmigrations/migrations.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -572,7 +573,8 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb // Re-get the list of migrations in case any of them were completed between // our initial check and our grabbing of the lease. - completedMigrations, err := getCompletedMigrations(ctx, m.db.Scan, m.codec) + completedMigrations, err := getCompletedMigrations(ctx, m.stopper.ShouldQuiesce(), m.db.Scan, + m.codec) if err != nil { return err } @@ -620,7 +622,7 @@ func (m *Manager) EnsureMigrations(ctx context.Context, bootstrapVersion roachpb func (m *Manager) checkIfAllMigrationsAreComplete( ctx context.Context, bootstrapVersion roachpb.Version, scan scanFunc, ) (completedAll bool, _ error) { - completedMigrations, err := getCompletedMigrations(ctx, scan, m.codec) + completedMigrations, err := getCompletedMigrations(ctx, m.stopper.ShouldQuiesce(), scan, m.codec) if err != nil { return false, err } @@ -664,14 +666,23 @@ func (m *Manager) shouldRunMigration( type scanFunc = func(_ context.Context, from, to interface{}, maxRows int64) ([]kv.KeyValue, error) +// This method has baked in startup retry and should not be called on a non +// startup path. This is different from master/23.1 and changes that would reuse +// this in other places are unlikely, but care must be taken in case some fixes +// are backported. func getCompletedMigrations( - ctx context.Context, scan scanFunc, codec keys.SQLCodec, + ctx context.Context, quiesce <-chan struct{}, scan scanFunc, codec keys.SQLCodec, ) (map[string]struct{}, error) { if log.V(1) { log.Info(ctx, "trying to get the list of completed migrations") } prefix := codec.StartupMigrationKeyPrefix() - keyvals, err := scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */) + var keyvals []kv.KeyValue + err := startup.RunIdempotentWithRetry(ctx, quiesce, "get completed migrations", + func(ctx context.Context) (err error) { + keyvals, err = scan(ctx, prefix, prefix.PrefixEnd(), 0 /* maxRows */) + return err + }) if err != nil { return nil, errors.Wrapf(err, "failed to get list of completed migrations") } diff --git a/pkg/testutils/BUILD.bazel b/pkg/testutils/BUILD.bazel index e0c38611ff5e..67bbaff34e38 100644 --- a/pkg/testutils/BUILD.bazel +++ b/pkg/testutils/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "files.go", "hook.go", "keys.go", + "listener.go", "net.go", "pprof.go", "soon.go", diff --git a/pkg/testutils/listener.go b/pkg/testutils/listener.go new file mode 100644 index 000000000000..65631427954b --- /dev/null +++ b/pkg/testutils/listener.go @@ -0,0 +1,168 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package testutils + +import ( + "net" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// ListenerRegistry is a registry for listener sockets that allows TestServers +// to reuse listener sockets and keep them on the same ports throughout server +// restarts. +// Tests rely on net.Listen on port 0 to open first available port and uses its +// details to let TestServers connect to each other. Tests can't rely on fixed +// ports because it must be possible to run lots of tests in parallel. When +// TestServer is restarted, it would close its listener and reopen a new one +// on a different port as its own port might be reused by that time. +// This registry provides listener wrappers that could be associated with server +// ids and injected into TestServers normal way. Listeners will not close +// actual network sockets when closed, but will pause accepting connections. +// Test could then specifically resume listeners prior to restarting servers. +type ListenerRegistry struct { + listeners map[int]*reusableListener +} + +// NewListenerRegistry creates a registry of reusable listeners to be used with +// test cluster. Once created use ListenerRegistry.GetOrFail to create new +// listeners and inject them into test cluster using Listener field of +// base.TestServerArgs. +func NewListenerRegistry() ListenerRegistry { + return ListenerRegistry{listeners: make(map[int]*reusableListener)} +} + +// GetOrFail returns an existing reusable socket listener or creates a new one +// on a random local port. +func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) net.Listener { + t.Helper() + if l, ok := r.listeners[idx]; ok { + return l + } + nl, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "failed to create network listener") + l := &reusableListener{ + id: idx, + wrapped: nl, + acceptC: make(chan acceptResult), + stopC: make(chan interface{}), + } + l.resume() + r.listeners[idx] = l + go l.run() + return l +} + +// ReopenOrFail will allow accepting more connections on existing shared +// listener if it was previously closed. If it was not closed, nothing happens. +// If listener wasn't created previously, test failure is raised. +func (r *ListenerRegistry) ReopenOrFail(t *testing.T, idx int) { + l, ok := r.listeners[idx] + require.Truef(t, ok, "socket for id %d is not open", idx) + l.resume() +} + +// Close closes and deletes all previously created shared listeners. +func (r *ListenerRegistry) Close() { + for k, v := range r.listeners { + _ = v.wrapped.Close() + close(v.stopC) + delete(r.listeners, k) + } +} + +type acceptResult struct { + conn net.Conn + err error +} + +type reusableListener struct { + id int + wrapped net.Listener + acceptC chan acceptResult + pauseMu struct { + syncutil.RWMutex + pauseC chan interface{} + } + stopC chan interface{} +} + +func (l *reusableListener) run() { + defer func() { + close(l.acceptC) + }() + for { + c, err := l.wrapped.Accept() + if errors.Is(err, net.ErrClosed) { + return + } + select { + case l.acceptC <- acceptResult{ + conn: c, + err: err, + }: + case <-l.pauseC(): + _ = c.Close() + case <-l.stopC: + _ = c.Close() + return + } + } +} + +func (l *reusableListener) pauseC() <-chan interface{} { + l.pauseMu.RLock() + defer l.pauseMu.RUnlock() + return l.pauseMu.pauseC +} + +func (l *reusableListener) resume() { + l.pauseMu.Lock() + defer l.pauseMu.Unlock() + l.pauseMu.pauseC = make(chan interface{}) +} + +// Accept implements net.Listener interface. +func (l *reusableListener) Accept() (net.Conn, error) { + select { + case c, ok := <-l.acceptC: + if !ok { + return nil, net.ErrClosed + } + return c.conn, c.err + case <-l.pauseC(): + return nil, net.ErrClosed + } +} + +// Close implements net.Listener interface. Since listener is reused, close +// doesn't close underlying listener and it is the responsibility of +// ListenerRegistry that provided it to close wrapped listener when registry +// is closed. +func (l *reusableListener) Close() error { + l.pauseMu.Lock() + defer l.pauseMu.Unlock() + select { + case <-l.pauseMu.pauseC: + // Already paused, nothing to do. + default: + close(l.pauseMu.pauseC) + } + return nil +} + +// Addr implements net.Listener interface. +func (l *reusableListener) Addr() net.Addr { + return l.wrapped.Addr() +} diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 57cd5e2ac260..8085213f5c5e 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/rpc/nodedialer", "//pkg/server", "//pkg/server/serverpb", + "//pkg/spanconfig", "//pkg/sql/catalog", "//pkg/sql/randgen", "//pkg/storage", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index bb10c027eb59..4cbbfab508a2 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/storage" @@ -1397,6 +1398,36 @@ func (tc *TestCluster) WaitForFullReplication() error { return nil } +// WaitFor5NodeReplication ensures that zone configs are applied and +// up-replication is performed with new zone configs. This is the case for 5+ +// node clusters. +// TODO: This code should be moved into WaitForFullReplication once #99812 is +// fixed so that all test would benefit from this check implicitly. +// This bug currently prevents LastUpdated to tick in metamorphic tests +// with kv.expiration_leases_only.enabled = true. +func (tc *TestCluster) WaitFor5NodeReplication() error { + if len(tc.Servers) > 4 && tc.ReplicationMode() == base.ReplicationAuto { + // We need to wait for zone config propagations before we could check + // conformance since zone configs are propagated synchronously. + // Generous timeout is added to allow rangefeeds to catch up. On startup + // they could get delayed making test to fail. + now := tc.Server(0).Clock().Now() + for _, s := range tc.Servers { + scs := s.SpanConfigKVSubscriber().(spanconfig.KVSubscriber) + if err := testutils.SucceedsSoonError(func() error { + if scs.LastUpdated().Less(now) { + return errors.New("zone configs not propagated") + } + return nil + }); err != nil { + return err + } + } + return tc.WaitForFullReplication() + } + return nil +} + // WaitForNodeStatuses waits until a NodeStatus is persisted for every node and // store in the cluster. func (tc *TestCluster) WaitForNodeStatuses(t testing.TB) { @@ -1549,21 +1580,23 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } serverArgs := tc.serverArgs[idx] - if idx == 0 { - // If it's the first server, then we need to restart the RPC listener by hand. - // Look at NewTestCluster for more details. - listener, err := net.Listen("tcp", serverArgs.Listener.Addr().String()) - if err != nil { - return err - } - serverArgs.Listener = listener - serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = serverArgs.Listener - } else { - serverArgs.Addr = "" - // Try and point the server to a live server in the cluster to join. - for i := range tc.Servers { - if !tc.ServerStopped(i) { - serverArgs.JoinAddr = tc.Servers[i].ServingRPCAddr() + if !tc.clusterArgs.ReusableListeners { + if idx == 0 { + // If it's the first server, then we need to restart the RPC listener by hand. + // Look at NewTestCluster for more details. + listener, err := net.Listen("tcp", serverArgs.Listener.Addr().String()) + if err != nil { + return err + } + serverArgs.Listener = listener + serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = serverArgs.Listener + } else { + serverArgs.Addr = "" + // Try and point the server to a live server in the cluster to join. + for i := range tc.Servers { + if !tc.ServerStopped(i) { + serverArgs.JoinAddr = tc.Servers[i].ServingRPCAddr() + } } } } diff --git a/pkg/ts/BUILD.bazel b/pkg/ts/BUILD.bazel index 708e41c32f5e..00aea0189c95 100644 --- a/pkg/ts/BUILD.bazel +++ b/pkg/ts/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/quotapool", + "//pkg/util/startup", "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ts/db.go b/pkg/ts/db.go index b812e4136560..312acbec22ec 100644 --- a/pkg/ts/db.go +++ b/pkg/ts/db.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -164,15 +165,17 @@ func (db *DB) PollSource( // time series data from the DataSource and store it. func (p *poller) start() { // Poll once immediately and synchronously. - p.poll() + // Wrap context as a startup context to enable access to kv on startup path + // without retries. + p.poll(p.AnnotateCtx(startup.WithoutChecks(context.Background()))) bgCtx := p.AnnotateCtx(context.Background()) - _ = p.stopper.RunAsyncTask(bgCtx, "ts-poller", func(context.Context) { + _ = p.stopper.RunAsyncTask(bgCtx, "ts-poller", func(ctx context.Context) { ticker := time.NewTicker(p.frequency) defer ticker.Stop() for { select { case <-ticker.C: - p.poll() + p.poll(ctx) case <-p.stopper.ShouldQuiesce(): return } @@ -182,12 +185,11 @@ func (p *poller) start() { // poll retrieves data from the underlying DataSource a single time, storing any // returned time series data on the server. -func (p *poller) poll() { +func (p *poller) poll(ctx context.Context) { if !TimeseriesStorageEnabled.Get(&p.db.st.SV) { return } - ctx := p.AnnotateCtx(context.Background()) if err := p.stopper.RunTask(ctx, "ts.poller: poll", func(ctx context.Context) { data := p.source.GetTimeSeriesData() if len(data) == 0 { diff --git a/pkg/util/randutil/rand.go b/pkg/util/randutil/rand.go index 4f2ad837758a..daa2be07a772 100644 --- a/pkg/util/randutil/rand.go +++ b/pkg/util/randutil/rand.go @@ -24,6 +24,39 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) +// lockedSource is a thread safe math/rand.Source. See math/rand/rand.go. +type lockedSource struct { + mu syncutil.Mutex + src rand.Source64 +} + +// NewLockedSource creates random source protected by mutex. +func NewLockedSource(seed int64) rand.Source { + return &lockedSource{ + src: rand.NewSource(seed).(rand.Source64), + } +} + +func (rng *lockedSource) Int63() (n int64) { + rng.mu.Lock() + n = rng.src.Int63() + rng.mu.Unlock() + return +} + +func (rng *lockedSource) Uint64() (n uint64) { + rng.mu.Lock() + n = rng.src.Uint64() + rng.mu.Unlock() + return +} + +func (rng *lockedSource) Seed(seed int64) { + rng.mu.Lock() + rng.src.Seed(seed) + rng.mu.Unlock() +} + // globalSeed contains a pseudo random seed that should only be used in tests. var globalSeed int64 @@ -70,6 +103,16 @@ func NewPseudoRand() (*rand.Rand, int64) { // seed. This rand.Rand is useful in testing to produce deterministic, // reproducible behavior. func NewTestRand() (*rand.Rand, int64) { + return newTestRandImpl(rand.NewSource) +} + +// NewLockedTestRand is identical to NewTestRand but returned rand.Rand is using +// thread safe underlying source. +func NewLockedTestRand() (*rand.Rand, int64) { + return newTestRandImpl(NewLockedSource) +} + +func newTestRandImpl(f func(int64) rand.Source) (*rand.Rand, int64) { mtx.Lock() defer mtx.Unlock() fxn := getTestName() @@ -78,10 +121,10 @@ func NewTestRand() (*rand.Rand, int64) { // the global seed so that individual tests are reproducible using the // random seed. lastTestName = fxn - rng = rand.New(rand.NewSource(globalSeed)) + rng = rand.New(f(globalSeed)) } seed := rng.Int63() - return rand.New(rand.NewSource(seed)), seed + return rand.New(f(seed)), seed } // NewTestRandWithSeed returns an instance of math/rand.Rand, similar to diff --git a/pkg/util/startup/BUILD.bazel b/pkg/util/startup/BUILD.bazel new file mode 100644 index 000000000000..b553a743a322 --- /dev/null +++ b/pkg/util/startup/BUILD.bazel @@ -0,0 +1,43 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "startup", + srcs = ["retry.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/startup", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/util", + "//pkg/util/log", + "//pkg/util/retry", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_petermattis_goid//:goid", + ], +) + +go_test( + name = "startup_test", + srcs = ["startup_test.go"], + args = ["-test.timeout=295s"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/startup/retry.go b/pkg/util/startup/retry.go new file mode 100644 index 000000000000..e7868289a8d6 --- /dev/null +++ b/pkg/util/startup/retry.go @@ -0,0 +1,195 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package startup provides functionality for retrying kv operations on server +// startup. +// +// Server startup is now heavily dependent on kv for retrieving various +// metadata. It is possible for operations to fail with ambiguous error in case +// of timeouts. This problem is exacerbated by introduction of circuit breakers. +// Circuit breaker will fail fast if previous access to the replica failed +// and will try to do probing and reset in the background. +// Because of this, startup operations could fail immediately if consensus is +// lost on a range thus preventing node from restarting. If the node itself is +// needed to reestablish range consensus then the node would not be able to +// rejoin the cluster at all after circuit breaker is activated on that range +// unless circuit breaker is reset by restarting all nodes. +// +// To address this problem, all kv operations that are part of the start-up +// sequence must be retried if they encounter kvpb.ReplicaUnavailableError. +// Those errors could be returned from both kv and sql queries. +// +// This package provides functionality to wrap functions in retry loops as well +// as set of assertions to verify that all operations are doing retries during +// startup. +// +// This behaviour is achieved by checking call stack in kv operations and +// queries in internal sql executor. Latter is needed because we can't assert kv +// operations run by executor in separate go routines. +// Assertions expect call context be tagged with startupRetryKey. If they detect +// that call is on startup path and don't have retry tag, server will crash. +// Provided retry functions tag context that is passed to functions being +// retried. +// If test has a test only code path (e.g. avoid background operations for test) +// it is possible to disable retry check by using WithoutChecks decorator for +// context. +// +// Retry tracking is only done when build is run with util.RaceEnabled. +// +// Separate Begin function should be called at the beginning of the startup +// and returned cleanup function should be called at the end. This ensures that +// startup stack is correctly detected and that test clusters are not +// excessively penalized by assertions when startup is complete. +// +// If your test fails because startup assertion failed, then there's a new code +// path run during server startup which is not correctly wrapped in retry loop +// and it could cause node startup failures. You need to identify what is called +// from Server.PreStart and add retry accordingly. +package startup + +import ( + "context" + "sync/atomic" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/petermattis/goid" +) + +// startupRetryKey context key used by assertions to track that code is running +// inside startup retry loop. +type startupRetryKey struct{} + +// startupRetryOpts is retry config for startup operations. +var startupRetryOpts = retry.Options{ + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 3 * time.Second, + Multiplier: 2, +} + +// runningStartup is a counter showing number of concurrently running server +// startups. If it is zero, then startups are finished and we should not try +// to run startup retry checks. +// This counter only makes sense and used when running TestCluster and is an +// optimization to stop doing any potentially expensive assertions and obtaining +// startupGoroutineIDs lock once servers are running. +// Mind that this is a best effort tracking and assertions themselves should +// not give false positives/negatives regardless of this counter value. +var runningStartup atomic.Int32 + +type goroutineIDs struct { + syncutil.RWMutex + ids map[int64]bool +} + +// startupGoroutineIDs contains id's of goroutines on which startup is running. +// We could have more than a single one in integration tests where TestCluster +// uses parallel startup. +var startupGoroutineIDs = goroutineIDs{ + ids: make(map[int64]bool), +} + +// WithoutChecks adds startup tag to context to allow kv and sql executor access +// from startup path without wrapping methods in RunIdempotentWithRetry*. +func WithoutChecks(ctx context.Context) context.Context { + return context.WithValue(ctx, startupRetryKey{}, "bypass retry check") +} + +// RunIdempotentWithRetry synchronously runs a function with retry of circuit +// breaker errors on startup. Since we retry ambiguous errors unconditionally, +// operations should be idempotent by nature. If this is not possible, then +// retry should be performed explicitly while using WithoutChecks context +// to suppress safety mechanisms. +func RunIdempotentWithRetry( + ctx context.Context, quiesce <-chan struct{}, opName string, f func(ctx context.Context) error, +) error { + ctx = context.WithValue(ctx, startupRetryKey{}, "in retry") + every := log.Every(5 * time.Second) + // Retry failures indefinitely until context is cancelled. + var err error + retryOpts := startupRetryOpts + retryOpts.Closer = quiesce + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + err = f(ctx) + if err == nil { + break + } + if ctx.Err() != nil || !IsRetryableReplicaError(err) { + break + } + if every.ShouldLog() { + log.Infof(ctx, "failed %s during node startup, retrying %s", opName, err) + } + } + return err +} + +// Begin ensures that startup detection is correctly identifying startup stack +// and initializing internal assertions for tests. It must be called first +// on server startup before any calls to kv or sql are made. +// Returned function must be called when startup function is complete. +func Begin(ctx context.Context) func() { + if !util.RaceEnabled { + return func() {} + } + + // Ensure we don't try to call add twice by mistake before inserting our ID. + startupID := goid.Get() + startupGoroutineIDs.Lock() + defer startupGoroutineIDs.Unlock() + if _, ok := startupGoroutineIDs.ids[startupID]; ok { + log.Fatal(ctx, "startup.Begin() is called twice") + } + startupGoroutineIDs.ids[startupID] = true + + // Maintain counter as atomic to avoid doing full locks on every send + // operation. + runningStartup.Add(1) + return func() { + startupGoroutineIDs.Lock() + delete(startupGoroutineIDs.ids, startupID) + startupGoroutineIDs.Unlock() + runningStartup.Add(-1) + } +} + +// AssertStartupRetry is called by dist sender and internal sql executor to +// ensure that server startup is always performing retry of kv operations. +func AssertStartupRetry(ctx context.Context) { + if !util.RaceEnabled { + return + } + if runningStartup.Load() < 1 { + return + } + + rv := ctx.Value(startupRetryKey{}) + if rv == nil && inStartup() { + log.Fatal(ctx, "startup query called outside of startup retry loop. See util/startup/retry.go docs") + } +} + +// inStartup returns true if currently running go routine called Begin before. +func inStartup() bool { + currentID := goid.Get() + startupGoroutineIDs.RLock() + defer startupGoroutineIDs.RUnlock() + return startupGoroutineIDs.ids[currentID] +} + +// IsRetryableReplicaError returns true for replica availability errors. +func IsRetryableReplicaError(err error) bool { + return errors.HasType(err, (*roachpb.ReplicaUnavailableError)(nil)) || errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) +} diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go new file mode 100644 index 000000000000..8ee26510d26e --- /dev/null +++ b/pkg/util/startup/startup_test.go @@ -0,0 +1,278 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package startup_test + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +// TestStartupFailure can help reproducing failures found in randomized test as +// it may be tricky to get the same results easily. To reproduce failure, use +// failing range from test and plug in into faultKey. +// As of time of writing following ranges trigger retries: +// - /System/NodeLivenessMax (keys.BootstrapVersionKey) +// - /Table/11 (keys.SystemSQLCodec.TablePrefix(11)) +// If startup.IsRetryableReplicaError is changed to return false every time, +// test should fail if range from above is used. +func TestStartupFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.IgnoreLint(t, "this test should be used to reproduce failures from random tests") + + // Set faultyKey to suspected range that is causing startup failure, comment + // out skip statement above and run the test. + faultyKey := keys.BootstrapVersionKey + runCircuitBreakerTestForKey(t, + func(spans []roachpb.Span) (good, bad []roachpb.Span) { + for _, span := range spans { + if span.ContainsKey(faultyKey) { + bad = append(bad, span) + } else { + good = append(good, span) + } + } + return good, bad + }) +} + +// TestStartupFailureRandomRange verifies that no range with transient failure +// would cause node startup to fail. +// To achieve this, test picks a random range then relocates replicas of the +// range so that restarting a node would cause this range to lose quorum, +// stops node, ensures circuit breaker is set and starts node back. The node +// should start without raising error (for detailed explanation see doc on +// runCircuitBreakerTestForKey). +// If this test fails, error should indicate which range failed and hopefully +// error will also contain call stack that caused failure. This range could be +// used in TestStartupFailure to investigate code path not covered by startup +// retries. +func TestStartupFailureRandomRange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // This test takes 30s and so we don't want it to run in the "blocking path" + // of CI at all, and we also don't want to stress it in nightlies as part of + // a big package (where it will take a lot of time that could be spent running + // "faster" tests). In this package, it is the only test and so it's fine to + // run it under nightly (skipping nightly stressrace because race builds with + // many nodes are very resource intensive and tend to collapse). + skip.UnderStressRace(t, "6 nodes with replication is too slow for stress race") + if !skip.NightlyStress() { + skip.IgnoreLint(t, "test takes 30s to run due to circuit breakers and timeouts") + } + + rng, seed := randutil.NewTestRand() + t.Log("TestStartupFailureRandomRange using seed", seed) + runCircuitBreakerTestForKey(t, + func(spans []roachpb.Span) (good, bad []roachpb.Span) { + spans = append([]roachpb.Span(nil), spans...) // avoid mutating input slice + badRange := rng.Intn(len(spans)) + t.Log("triggering loss of quorum on range", spans[badRange].Key.String()) + spans[0], spans[badRange] = spans[badRange], spans[0] // swap badRange to the front + return spans[1:], spans[:1] // good, bad + }) +} + +// runCircuitBreakerTestForKey performs node restart in presence of ranges that +// lost their quorum. +// To achieve this, selected range will be placed so that quorum-1 replicas will +// be located on live nodes, one replica would be placed on node under test, +// and remaining replicas would be placed on nodes that would be stopped. +// All remaining ranges would be placed so that live nodes would maintain quorum +// throughout the test. +// Node lifecycle during test: +// 1, 2, 3 - live +// 4, 5 - stopped +// 6 - stopped and restarted +// To ensure that replicas are not moved, all queues and rebalancer are stopped +// once initial replication is done. +// To ensure circuit breaker is engaged on the broken range, test will perform +// a probe write to it after node is stopped. +// After that node is restarted and test verifies that start doesn't raise an +// error. +func runCircuitBreakerTestForKey( + t *testing.T, faultyRangeSelector func([]roachpb.Span) (good, bad []roachpb.Span), +) { + const ( + nodes = 6 + ) + ctx := context.Background() + + lReg := testutils.NewListenerRegistry() + defer lReg.Close() + reg := server.NewStickyInMemEnginesRegistry() + defer reg.CloseAllStickyInMemEngines() + + args := base.TestClusterArgs{ + ServerArgsPerNode: make(map[int]base.TestServerArgs), + ReusableListeners: true, + } + var enableFaults atomic.Bool + for i := 0; i < nodes; i++ { + a := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: reg, + }, + SpanConfig: &spanconfig.TestingKnobs{ + ConfigureScratchRange: true, + }, + }, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Listener: lReg.GetOrFail(t, i), + } + args.ServerArgsPerNode[i] = a + } + tc := testcluster.NewTestCluster(t, nodes, args) + tc.Start(t) + defer tc.Stopper().Stop(ctx) + require.NoError(t, tc.WaitFor5NodeReplication(), "failed to succeed 5x replication") + + tc.ToggleReplicateQueues(false) + c := tc.ServerConn(0) + _, err := c.ExecContext(ctx, "set cluster setting kv.allocator.load_based_rebalancing='off'") + require.NoError(t, err, "failed to disable load rebalancer") + _, err = c.ExecContext(ctx, + fmt.Sprintf("set cluster setting kv.replica_circuit_breaker.slow_replication_threshold='%s'", + base.SlowRequestThreshold.String())) + require.NoError(t, err, "failed to lower circuit breaker threshold") + + // replicaTargets holds template for replica placements for available and + // unavailable range configurations. + type replicaTargets struct { + safe, unsafe []roachpb.StoreID + } + + // Templates define replica placements different replication factors. + // Safe configuration ensures that replicas will maintain quorum when nodes + // are stopped while unsafe will lose quorum. + t3repl := replicaTargets{ + safe: []roachpb.StoreID{1, 2, 3}, + unsafe: []roachpb.StoreID{1, 4, 6}, + } + t5repl := replicaTargets{ + safe: []roachpb.StoreID{1, 2, 3, 4, 5}, + unsafe: []roachpb.StoreID{1, 2, 4, 5, 6}, + } + db := tc.Server(0).DB() + prepRange := func(rk roachpb.Key, fail bool) roachpb.RKey { + d := tc.LookupRangeOrFatal(t, rk) + replicaTemplate := t3repl + if len(d.InternalReplicas) > 3 { + replicaTemplate = t5repl + } + targets := replicaTemplate.safe + if fail { + targets = replicaTemplate.unsafe + } + var voters []roachpb.ReplicationTarget + for _, storeID := range targets { + voters = append(voters, roachpb.ReplicationTarget{ + NodeID: roachpb.NodeID(storeID), + StoreID: storeID, + }) + } + for { + err := db.AdminRelocateRange(ctx, rk, voters, nil, true) + if err == nil { + break + } + } + return d.StartKey + } + + var rangeSpans []roachpb.Span + r, err := c.QueryContext(ctx, "select range_id, start_key, end_key from crdb_internal.ranges_no_leases order by start_key") + require.NoError(t, err, "failed to query ranges") + for r.Next() { + var rangeID int + var key roachpb.Key + var endKey roachpb.Key + require.NoError(t, r.Scan(&rangeID, &key, &endKey), "failed to scan range data from query") + rangeSpans = append(rangeSpans, roachpb.Span{ + Key: key, + EndKey: endKey, + }) + } + good, bad := faultyRangeSelector(rangeSpans) + for _, span := range good { + prepRange(span.Key, false) + } + var ranges []string + for _, span := range bad { + prepRange(span.Key, true) + ranges = append(ranges, span.String()) + } + rangesList := fmt.Sprintf("[%s]", strings.Join(ranges, ", ")) + + // Remove nodes permanently to only leave quorum on planned ranges. + tc.StopServer(3) + tc.StopServer(4) + + // Stop node with replicas that would leave ranges without quorum. + tc.StopServer(5) + + // Probe compromised ranges to trigger circuit breakers on them. If we don't + // do this, then restart queries will wait for quorum to be reestablished with + // restarting node without failing. + var wg sync.WaitGroup + wg.Add(len(bad)) + for _, span := range bad { + go func(key roachpb.Key) { + defer wg.Done() + _ = db.Put(context.Background(), keys.RangeProbeKey(roachpb.RKey(key)), "") + }(span.Key) + } + wg.Wait() + + // Restart node and check that it succeeds in reestablishing range quorum + // necessary for startup actions. + lReg.ReopenOrFail(t, 5) + err = tc.RestartServer(5) + require.NoError(t, err, "restarting server with range(s) %s tripping circuit breaker", rangesList) + + // Disable faults to make it easier for cluster to stop. + enableFaults.Store(false) +}