From 0d894beb65749613ec8179ca3337f2bfa8a620ae Mon Sep 17 00:00:00 2001 From: Barry He Date: Fri, 5 Mar 2021 04:27:06 -0500 Subject: [PATCH 1/2] sql: use stopper for virtual table row pushing go routine Previously, the row pushing go routine in virtual table would not go through the stopper. Now it uses the stopper to run an async task, allowing the stopper to help with error handling. Release justification: bug fix and low-risk update Release note: None --- pkg/sql/crdb_internal.go | 25 +++++++++++-------------- pkg/sql/exec_factory_util.go | 2 +- pkg/sql/virtual_schema.go | 20 ++++++++++++++------ pkg/sql/virtual_table.go | 21 ++++++++++++--------- pkg/sql/virtual_table_test.go | 31 ++++++++++++++++++++----------- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 1b8156409f8d..1015e2e4759a 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -296,7 +297,7 @@ CREATE TABLE crdb_internal.tables ( parent_schema_id INT NOT NULL, locality TEXT )`, - generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbDesc *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { row := make(tree.Datums, 14) worker := func(pusher rowPusher) error { descs, err := p.Descriptors().GetAllDescriptors(ctx, p.txn) @@ -406,8 +407,7 @@ CREATE TABLE crdb_internal.tables ( } return nil } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -616,7 +616,7 @@ CREATE TABLE crdb_internal.jobs ( coordinator_id INT )`, comment: `decoded job metadata from system.jobs (KV scan)`, - generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { currentUser := p.SessionData().User() isAdmin, err := p.HasAdminRole(ctx) if err != nil { @@ -2152,7 +2152,7 @@ CREATE TABLE crdb_internal.table_columns ( hidden BOOL NOT NULL ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { row := make(tree.Datums, 8) worker := func(pusher rowPusher) error { return forEachTableDescAll(ctx, p, dbContext, hideVirtual, @@ -2188,8 +2188,7 @@ CREATE TABLE crdb_internal.table_columns ( }, ) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -2209,7 +2208,7 @@ CREATE TABLE crdb_internal.table_indexes ( is_inverted BOOL NOT NULL ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { primary := tree.NewDString("primary") secondary := tree.NewDString("secondary") row := make(tree.Datums, 7) @@ -2242,8 +2241,7 @@ CREATE TABLE crdb_internal.table_indexes ( }, ) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } @@ -2670,7 +2668,7 @@ CREATE TABLE crdb_internal.ranges_no_leases ( split_enforced_until TIMESTAMP ) `, - generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, _ *dbdesc.Immutable, _ *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { if err := p.RequireAdminRole(ctx, "read crdb_internal.ranges_no_leases"); err != nil { return nil, nil, err } @@ -3613,7 +3611,7 @@ CREATE TABLE crdb_internal.partitions ( subzone_id INT -- references a subzone id in the crdb_internal.zones table ) `, - generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) { + generator: func(ctx context.Context, p *planner, dbContext *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) { dbName := "" if dbContext != nil { dbName = dbContext.GetName() @@ -3629,8 +3627,7 @@ CREATE TABLE crdb_internal.partitions ( }) }) } - next, cleanup := setupGenerator(ctx, worker) - return next, cleanup, nil + return setupGenerator(ctx, worker, stopper) }, } diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index 89653f62fc26..7be30f4c606d 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -225,7 +225,7 @@ func constructVirtualScan( indexDesc := index.(*optVirtualIndex).desc columns, constructor := virtual.getPlanInfo( table.(*optVirtualTable).desc, - indexDesc, params.IndexConstraint) + indexDesc, params.IndexConstraint, p.execCfg.DistSQLPlanner.stopper) n, err := delayedNodeCallback(&delayedNode{ name: fmt.Sprintf("%s@%s", table.Name(), index.Name()), diff --git a/pkg/sql/virtual_schema.go b/pkg/sql/virtual_schema.go index 0a2fa7bb8071..2c9f36437e93 100644 --- a/pkg/sql/virtual_schema.go +++ b/pkg/sql/virtual_schema.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -109,7 +110,7 @@ type virtualSchemaTable struct { // generator, if non-nil, is a function that is used when creating a // virtualTableNode. This function returns a virtualTableGenerator function // which generates the next row of the virtual table when called. - generator func(ctx context.Context, p *planner, db *dbdesc.Immutable) (virtualTableGenerator, cleanupFunc, error) + generator func(ctx context.Context, p *planner, db *dbdesc.Immutable, stopper *stop.Stopper) (virtualTableGenerator, cleanupFunc, error) } // virtualSchemaView represents a view within a virtualSchema @@ -448,6 +449,7 @@ func (e *virtualDefEntry) getPlanInfo( table catalog.TableDescriptor, index *descpb.IndexDescriptor, idxConstraint *constraint.Constraint, + stopper *stop.Stopper, ) (colinfo.ResultColumns, virtualTableConstructor) { var columns colinfo.ResultColumns for _, col := range e.desc.PublicColumns() { @@ -483,7 +485,7 @@ func (e *virtualDefEntry) getPlanInfo( } if def.generator != nil { - next, cleanup, err := def.generator(ctx, p, dbDesc) + next, cleanup, err := def.generator(ctx, p, dbDesc, stopper) if err != nil { return nil, err } @@ -492,14 +494,17 @@ func (e *virtualDefEntry) getPlanInfo( constrainedScan := idxConstraint != nil && !idxConstraint.IsUnconstrained() if !constrainedScan { - generator, cleanup := setupGenerator(ctx, func(pusher rowPusher) error { + generator, cleanup, setupError := setupGenerator(ctx, func(pusher rowPusher) error { return def.populate(ctx, p, dbDesc, func(row ...tree.Datum) error { if err := e.validateRow(row, columns); err != nil { return err } return pusher.pushRow(row...) }) - }) + }, stopper) + if setupError != nil { + return nil, setupError + } return p.newVirtualTableNode(columns, generator, cleanup), nil } @@ -514,8 +519,11 @@ func (e *virtualDefEntry) getPlanInfo( columnIdxMap := catalog.ColumnIDToOrdinalMap(table.PublicColumns()) indexKeyDatums := make([]tree.Datum, len(index.ColumnIDs)) - generator, cleanup := setupGenerator(ctx, e.makeConstrainedRowsGenerator( - ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns)) + generator, cleanup, setupError := setupGenerator(ctx, e.makeConstrainedRowsGenerator( + ctx, p, dbDesc, index, indexKeyDatums, columnIdxMap, idxConstraint, columns), stopper) + if setupError != nil { + return nil, setupError + } return p.newVirtualTableNode(columns, generator, cleanup), nil default: diff --git a/pkg/sql/virtual_table.go b/pkg/sql/virtual_table.go index 34247a8ed39f..77e597c66966 100644 --- a/pkg/sql/virtual_table.go +++ b/pkg/sql/virtual_table.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -67,8 +68,8 @@ type virtualTableGeneratorResponse struct { // * cleanup: Performs all cleanup. This function must be called exactly once // to ensure that resources are cleaned up. func setupGenerator( - ctx context.Context, worker func(pusher rowPusher) error, -) (next virtualTableGenerator, cleanup cleanupFunc) { + ctx context.Context, worker func(pusher rowPusher) error, stopper *stop.Stopper, +) (next virtualTableGenerator, cleanup cleanupFunc, setupError error) { var cancel func() ctx, cancel = context.WithCancel(ctx) var wg sync.WaitGroup @@ -82,14 +83,12 @@ func setupGenerator( // computation through comm, and the generator places rows to consume // back into comm. comm := make(chan virtualTableGeneratorResponse) - addRow := func(datums ...tree.Datum) error { select { case <-ctx.Done(): return cancelchecker.QueryCanceledError case comm <- virtualTableGeneratorResponse{datums: datums}: } - // Block until the next call to cleanup() or next(). This allows us to // avoid issues with concurrent transaction usage if the worker is using // a transaction. Otherwise, worker could proceed running operations after @@ -107,7 +106,7 @@ func setupGenerator( } wg.Add(1) - go func() { + if setupError = stopper.RunAsyncTask(ctx, "sql.rowPusher: send rows", func(ctx context.Context) { defer wg.Done() // We wait until a call to next before starting the worker. This prevents // concurrent transaction usage during the startup phase. We also have to @@ -125,14 +124,19 @@ func setupGenerator( if errors.Is(err, cancelchecker.QueryCanceledError) { return } - // Notify that we are done sending rows. select { case <-ctx.Done(): return case comm <- virtualTableGeneratorResponse{err: err}: } - }() + }); setupError != nil { + // The presence of an error means the goroutine never started, + // thus wg.Done() is never called, which can result in + // cleanup() being blocked indefinitely on wg.Wait(). We call + // wg.Done() manually here to account for this case. + wg.Done() + } next = func() (tree.Datums, error) { // Notify the worker to begin computing a row. @@ -141,7 +145,6 @@ func setupGenerator( case <-ctx.Done(): return nil, cancelchecker.QueryCanceledError } - // Wait for the row to be sent. select { case <-ctx.Done(): @@ -150,7 +153,7 @@ func setupGenerator( return resp.datums, resp.err } } - return next, cleanup + return next, cleanup, setupError } // virtualTableNode is a planNode that constructs its rows by repeatedly diff --git a/pkg/sql/virtual_table_test.go b/pkg/sql/virtual_table_test.go index 4a08042eb08f..3b30dccd1534 100644 --- a/pkg/sql/virtual_table_test.go +++ b/pkg/sql/virtual_table_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -25,8 +26,10 @@ import ( func TestVirtualTableGenerators(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) t.Run("test cleanup", func(t *testing.T) { - ctx := context.Background() worker := func(pusher rowPusher) error { if err := pusher.pushRow(tree.NewDInt(1)); err != nil { return err @@ -36,8 +39,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return nil } - - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) d, err := next() if err != nil { t.Fatal(err) @@ -50,7 +53,6 @@ func TestVirtualTableGenerators(t *testing.T) { t.Run("test worker error", func(t *testing.T) { // Test that if the worker returns an error we catch it. - ctx := context.Background() worker := func(pusher rowPusher) error { if err := pusher.pushRow(tree.NewDInt(1)); err != nil { return err @@ -60,7 +62,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return errors.New("dummy error") } - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) _, err := next() require.NoError(t, err) _, err = next() @@ -71,12 +74,12 @@ func TestVirtualTableGenerators(t *testing.T) { }) t.Run("test no next", func(t *testing.T) { - ctx := context.Background() // Test we don't leak anything if we call cleanup before next. worker := func(pusher rowPusher) error { return nil } - _, cleanup := setupGenerator(ctx, worker) + _, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) cleanup() }) @@ -92,7 +95,8 @@ func TestVirtualTableGenerators(t *testing.T) { } return nil } - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) cancel() _, err := next() // There is a small chance that we race and don't return @@ -105,7 +109,8 @@ func TestVirtualTableGenerators(t *testing.T) { // Test cancellation after asking for a row. ctx, cancel = context.WithCancel(context.Background()) - next, cleanup = setupGenerator(ctx, worker) + next, cleanup, setupError = setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) row, err := next() require.NoError(t, err) require.Equal(t, tree.Datums{tree.NewDInt(1)}, row) @@ -116,7 +121,8 @@ func TestVirtualTableGenerators(t *testing.T) { // Test cancellation after asking for all the rows. ctx, cancel = context.WithCancel(context.Background()) - next, cleanup = setupGenerator(ctx, worker) + next, cleanup, setupError = setupGenerator(ctx, worker, stopper) + require.NoError(t, setupError) _, err = next() require.NoError(t, err) _, err = next() @@ -129,7 +135,9 @@ func TestVirtualTableGenerators(t *testing.T) { func BenchmarkVirtualTableGenerators(b *testing.B) { defer leaktest.AfterTest(b)() defer log.Scope(b).Close(b) + stopper := stop.NewStopper() ctx := context.Background() + defer stopper.Stop(ctx) worker := func(pusher rowPusher) error { for { if err := pusher.pushRow(tree.NewDInt(tree.DInt(1))); err != nil { @@ -138,7 +146,8 @@ func BenchmarkVirtualTableGenerators(b *testing.B) { } } b.Run("bench read", func(b *testing.B) { - next, cleanup := setupGenerator(ctx, worker) + next, cleanup, setupError := setupGenerator(ctx, worker, stopper) + require.NoError(b, setupError) b.ResetTimer() for i := 0; i < b.N; i++ { _, err := next() From 8560678a4d2e51159bb0ca6c1be54ca2672c33be Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Fri, 5 Mar 2021 11:36:29 -0800 Subject: [PATCH 2/2] kvserver: remove multiTestContext Closes #8299 With all the tests converted to use TestCluster/TestServer, we can finally remove multiTestContext and all of the dependent code. Release note: None Release justification: Cleans up unused test code. --- pkg/kv/kvserver/BUILD.bazel | 3 - pkg/kv/kvserver/client_raft_test.go | 30 - pkg/kv/kvserver/client_test.go | 1296 --------------------------- pkg/kv/kvserver/helpers_test.go | 26 - 4 files changed, 1355 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index c3593c73ea99..c3f1cdf443a6 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -293,12 +293,10 @@ go_test( "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", - "//pkg/gossip/resolver", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient", "//pkg/kv/kvclient/kvcoord", - "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvserver/abortspan", "//pkg/kv/kvserver/apply", "//pkg/kv/kvserver/batcheval", @@ -374,7 +372,6 @@ go_test( "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/uuid", - "@com_github_cenkalti_backoff//:backoff", "@com_github_cockroachdb_circuitbreaker//:circuitbreaker", "@com_github_cockroachdb_cockroach_go//crdb", "@com_github_cockroachdb_datadriven//:datadriven", diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index deeac246a117..5efb67be1368 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1763,36 +1763,6 @@ func TestStoreRangeUpReplicate(t *testing.T) { } } -// TestUnreplicateFirstRange verifies that multiTestContext still functions in -// the case where the first range (which contains range metadata) is -// unreplicated from the first store. This situation can arise occasionally in -// tests, as can a similar situation where the first store is no longer the lease holder of -// the first range; this verifies that those tests will not be affected. -// TODO(lunevalex): Remove this test when removing MTC, no need to convert it -func TestUnreplicateFirstRange(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - cfg := kvserver.TestStoreConfig(nil) - cfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{storeConfig: &cfg} - defer mtc.Stop() - mtc.Start(t, 3) - - const rangeID = roachpb.RangeID(1) - // Replicate the range to store 1. - mtc.replicateRange(rangeID, 1) - // Move the lease away from store 0 before removing its replica. - mtc.transferLease(context.Background(), rangeID, 0, 1) - // Unreplicate the from from store 0. - mtc.unreplicateRange(rangeID, 0) - require.NoError(t, mtc.waitForUnreplicated(rangeID, 0)) - // Replicate the range to store 2. The first range is no longer available on - // store 1, and this command will fail if that situation is not properly - // supported. - mtc.replicateRange(rangeID, 2) -} - // TestChangeReplicasDescriptorInvariant tests that a replica change aborts if // another change has been made to the RangeDescriptor since it was initiated. func TestChangeReplicasDescriptorInvariant(t *testing.T) { diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 8b7dcd0c2b17..45facfc222a0 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -21,55 +21,35 @@ package kvserver_test import ( "context" "fmt" - "math/rand" - "net" - "reflect" "sort" - "strings" - "sync" "testing" "time" - "github.com/cenkalti/backoff" - circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" - "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" - "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/netutil" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" "github.com/kr/pretty" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/raft/v3" - "google.golang.org/grpc" ) // createTestStore creates a test store using an in-memory @@ -264,1195 +244,6 @@ func createTestStoreWithOpts( return store } -type multiTestContext struct { - t testing.TB - storeConfig *kvserver.StoreConfig - manualClock *hlc.ManualClock - rpcContext *rpc.Context - // rpcTestingKnobs are optional configuration for the rpcContext. - rpcTestingKnobs rpc.ContextTestingKnobs - - // By default, a multiTestContext starts with a bunch of system ranges, just - // like a regular Server after bootstrap. If startWithSingleRange is set, - // we'll start with a single range spanning all the key space. The split - // queue, if not disabled, might then create other range system ranges. - startWithSingleRange bool - - nodeIDtoAddrMu struct { - *syncutil.RWMutex - nodeIDtoAddr map[roachpb.NodeID]net.Addr - } - - nodeDialer *nodedialer.Dialer - transport *kvserver.RaftTransport - - // The per-store clocks slice normally contains aliases of - // multiTestContext.clock, but it may be populleaseholderInfoated before Start() to - // use distinct clocks per store. - clocks []*hlc.Clock - engines []storage.Engine - grpcServers []*grpc.Server - distSenders []*kvcoord.DistSender - dbs []*kv.DB - gossips []*gossip.Gossip - storePools []*kvserver.StorePool - // We use multiple stoppers so we can restart different parts of the - // test individually. transportStopper is for 'transport', and the - // 'stoppers' slice corresponds to the 'stores'. - transportStopper *stop.Stopper - engineStoppers []*stop.Stopper - - // The fields below may mutate at runtime so the pointers they contain are - // protected by 'mu'. - mu *syncutil.RWMutex - senders []*kvserver.Stores - stores []*kvserver.Store - stoppers []*stop.Stopper - idents []roachpb.StoreIdent - nodeLivenesses []*liveness.NodeLiveness -} - -func (m *multiTestContext) getNodeIDAddress(nodeID roachpb.NodeID) (net.Addr, error) { - m.nodeIDtoAddrMu.RLock() - addr, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] - m.nodeIDtoAddrMu.RUnlock() - if ok { - return addr, nil - } - return nil, errors.Errorf("unknown peer %d", nodeID) -} - -func (m *multiTestContext) Start(t testing.TB, numStores int) { - { - // Only the fields we nil out below can be injected into m as it - // starts up, so fail early if anything else was set (as we'd likely - // override it and the test wouldn't get what it wanted). - mCopy := *m - mCopy.storeConfig = nil - mCopy.clocks = nil - mCopy.engines = nil - mCopy.engineStoppers = nil - mCopy.startWithSingleRange = false - mCopy.rpcTestingKnobs = rpc.ContextTestingKnobs{} - var empty multiTestContext - if !reflect.DeepEqual(empty, mCopy) { - t.Fatalf("illegal fields set in multiTestContext:\n%s", pretty.Diff(empty, mCopy)) - } - } - - m.t = t - - m.nodeIDtoAddrMu.RWMutex = &syncutil.RWMutex{} - m.mu = &syncutil.RWMutex{} - m.stores = make([]*kvserver.Store, numStores) - m.storePools = make([]*kvserver.StorePool, numStores) - m.distSenders = make([]*kvcoord.DistSender, numStores) - m.dbs = make([]*kv.DB, numStores) - m.stoppers = make([]*stop.Stopper, numStores) - m.senders = make([]*kvserver.Stores, numStores) - m.idents = make([]roachpb.StoreIdent, numStores) - m.grpcServers = make([]*grpc.Server, numStores) - m.gossips = make([]*gossip.Gossip, numStores) - m.nodeLivenesses = make([]*liveness.NodeLiveness, numStores) - - if m.storeConfig != nil && m.storeConfig.Clock != nil { - require.Nil(t, m.manualClock, "can't use manual clock; storeConfig.Clock is set") - require.Empty(t, m.clocks, "can't populate .clocks; storeConfig.Clock is set") - m.clocks = []*hlc.Clock{m.storeConfig.Clock} - } else if len(m.clocks) == 0 { - if m.manualClock == nil { - m.manualClock = hlc.NewManualClock(123) - } - m.clocks = []*hlc.Clock{hlc.NewClock(m.manualClock.UnixNano, time.Nanosecond)} - } - - if m.storeConfig != nil { - // Either they're equal, or the left is initially nil (see the golf - // above). - m.storeConfig.Clock = m.clocks[0] - } - - if m.transportStopper == nil { - m.transportStopper = stop.NewStopper() - } - st := cluster.MakeTestingClusterSettings() - if m.rpcContext == nil { - m.rpcContext = rpc.NewContext(rpc.ContextOptions{ - TenantID: roachpb.SystemTenantID, - AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, - Config: &base.Config{Insecure: true}, - Clock: m.clock(), - Stopper: m.transportStopper, - Settings: st, - Knobs: m.rpcTestingKnobs, - }) - // Ensure that tests using this test context and restart/shut down - // their servers do not inadvertently start talking to servers from - // unrelated concurrent tests. - m.rpcContext.ClusterID.Set(context.Background(), uuid.MakeV4()) - // We are sharing the same RPC context for all simulated nodes, so we can't enforce - // some of the RPC check validation. - m.rpcContext.TestingAllowNamedRPCToAnonymousServer = true - - // Create a breaker which never trips and never backs off to avoid - // introducing timing-based flakes. - m.rpcContext.BreakerFactory = func() *circuit.Breaker { - return circuit.NewBreakerWithOptions(&circuit.Options{ - BackOff: &backoff.ZeroBackOff{}, - }) - } - } - m.nodeDialer = nodedialer.New(m.rpcContext, m.getNodeIDAddress) - m.transport = kvserver.NewRaftTransport( - log.AmbientContext{Tracer: st.Tracer}, st, - m.nodeDialer, nil, m.transportStopper, - ) - - for idx := 0; idx < numStores; idx++ { - m.addStore(idx) - } - - // Wait for gossip to startup. - testutils.SucceedsSoon(t, func() error { - for i, g := range m.gossips { - if cfg := g.GetSystemConfig(); cfg == nil { - return errors.Errorf("system config not available at index %d", i) - } - } - return nil - }) -} - -func (m *multiTestContext) clock() *hlc.Clock { - return m.clocks[0] -} - -func (m *multiTestContext) Stop() { - done := make(chan struct{}) - go func() { - defer func() { - if r := recover(); r != nil { - m.t.Errorf("mtc.Stop() panicked: %+v", r) - } - }() - m.mu.RLock() - - // Quiesce everyone in parallel (before the transport stopper) to avoid - // deadlocks. - var wg sync.WaitGroup - wg.Add(len(m.stoppers)) - for _, s := range m.stoppers { - go func(s *stop.Stopper) { - defer wg.Done() - // Some Stoppers may be nil if stopStore has been called - // without restartStore. - if s != nil { - // TODO(tschottdorf): seems like it *should* be possible to - // call .Stop() directly, but then stressing essentially - // any test (TestRaftAfterRemove is a good example) results - // in deadlocks where a task can't finish because of - // getting stuck in addWriteCommand. - s.Quiesce(context.Background()) - } - }(s) - } - m.mu.RUnlock() - wg.Wait() - - m.mu.RLock() - defer m.mu.RUnlock() - for _, stopper := range m.stoppers { - if stopper != nil { - stopper.Stop(context.Background()) - } - } - m.transportStopper.Stop(context.Background()) - - for _, s := range m.engineStoppers { - s.Stop(context.Background()) - } - close(done) - }() - - select { - case <-done: - case <-time.After(30 * time.Second): - // If we've already failed, just attach another failure to the - // test, since a timeout during shutdown after a failure is - // probably not interesting, and will prevent the display of any - // pending t.Error. If we're timing out but the test was otherwise - // a success, panic so we see stack traces from other goroutines. - if m.t.Failed() { - m.t.Error("timed out during shutdown") - } else { - panic("timed out during shutdown") - } - } - - m.mu.RLock() - defer m.mu.RUnlock() - for _, s := range m.stores { - if s != nil { - s.AssertInvariants() - } - } -} - -type multiTestContextKVTransport struct { - mtc *multiTestContext - idx int - replicas kvcoord.ReplicaSlice - mu struct { - syncutil.Mutex - pending map[roachpb.ReplicaID]struct{} - } -} - -func (m *multiTestContext) kvTransportFactory( - _ kvcoord.SendOptions, _ *nodedialer.Dialer, replicas kvcoord.ReplicaSlice, -) (kvcoord.Transport, error) { - t := &multiTestContextKVTransport{ - mtc: m, - replicas: replicas, - } - t.mu.pending = map[roachpb.ReplicaID]struct{}{} - return t, nil -} - -func (t *multiTestContextKVTransport) String() string { - return fmt.Sprintf("%T: replicas=%v, idx=%d", t, t.replicas, t.idx) -} - -func (t *multiTestContextKVTransport) IsExhausted() bool { - return t.idx == len(t.replicas) -} - -// magicMultiTestContextKVTransportError can be returned by kvserver from an RPC -// to ask the multiTestContextKVTransport to inject an RPC error. This will -// cause the DistSender to consider the result ambiguous and to try the next -// replica. This is useful for triggering DistSender retries *after* the request -// has already evaluated. -const magicMultiTestContextKVTransportError = "inject RPC error (magicMultiTestContextKVTransportError)" - -func (t *multiTestContextKVTransport) SendNext( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, error) { - if ctx.Err() != nil { - return nil, errors.Wrap(ctx.Err(), "send context is canceled") - } - rep := t.replicas[t.idx] - t.idx++ - t.setPending(rep.ReplicaID, true) - - // Node IDs are assigned in the order the nodes are created by - // the multi test context, so we can derive the index for stoppers - // and senders by subtracting 1 from the node ID. - nodeIndex := int(rep.NodeID) - 1 - log.VEventf(ctx, 2, "SendNext nodeIndex=%d", nodeIndex) - - // This method crosses store boundaries: it is possible that the - // destination store is stopped while the source is still running. - // Run the send in a Task on the destination store to simulate what - // would happen with real RPCs. - t.mtc.mu.RLock() - s := t.mtc.stoppers[nodeIndex] - sender := t.mtc.senders[nodeIndex] - t.mtc.mu.RUnlock() - - if s == nil { - t.setPending(rep.ReplicaID, false) - return nil, errors.New("store is stopped") - } - - // Clone txn of ba args for sending. - ba.Replica = rep.ReplicaDescriptor - if txn := ba.Txn; txn != nil { - ba.Txn = ba.Txn.Clone() - } - var br *roachpb.BatchResponse - var pErr *roachpb.Error - if err := s.RunTask(ctx, "mtc send", func(ctx context.Context) { - // Clear the caller's tags to simulate going to a different node. Otherwise - // logs get tags from both the sender node and the receiver node, - // confusingly. - // TODO(andrei): Don't clear the tags if the RPC is going to the same node - // as the sender. We'd have to tell the multiTestContextKVTransport who the - // sender is. - callCtx := logtags.WithTags(ctx, nil /* tags */) - br, pErr = sender.Send(callCtx, ba) - }); err != nil { - pErr = roachpb.NewError(err) - } - if pErr != nil && strings.Contains(pErr.GoError().Error(), magicMultiTestContextKVTransportError) { - // We've been asked to inject an RPC error. This will cause the DistSender - // to consider the result ambiguous and to try the next replica. - return nil, errors.New("error injected by multiTestContextKVTransport after request has been evaluated") - } - if br == nil { - br = &roachpb.BatchResponse{} - } - if br.Error != nil { - panic(roachpb.ErrorUnexpectedlySet(sender, br)) - } - br.Error = pErr - - // On certain errors, we must expire leases to ensure that the - // next attempt has a chance of succeeding. - switch tErr := pErr.GetDetail().(type) { - case *roachpb.NotLeaseHolderError: - if leaseHolder := tErr.LeaseHolder; leaseHolder != nil { - t.mtc.mu.RLock() - leaseHolderStore := t.mtc.stores[leaseHolder.NodeID-1] - t.mtc.mu.RUnlock() - if leaseHolderStore == nil { - // The lease holder is known but down, so expire its lease. - if t.mtc.manualClock != nil { - t.mtc.advanceClock(ctx) - } - } - } else { - // stores has the range, is *not* the lease holder, but the - // lease holder is not known; this can happen if the lease - // holder is removed from the group. Move the manual clock - // forward in an attempt to expire the lease. - if t.mtc.manualClock != nil { - t.mtc.advanceClock(ctx) - } - } - } - t.setPending(rep.ReplicaID, false) - return br, nil -} - -func (t *multiTestContextKVTransport) NextInternalClient( - ctx context.Context, -) (context.Context, roachpb.InternalClient, error) { - panic("unimplemented") -} - -func (t *multiTestContextKVTransport) NextReplica() roachpb.ReplicaDescriptor { - if t.IsExhausted() { - return roachpb.ReplicaDescriptor{} - } - return t.replicas[t.idx].ReplicaDescriptor -} - -func (t *multiTestContextKVTransport) SkipReplica() { - if t.IsExhausted() { - return - } - t.idx++ -} - -func (t *multiTestContextKVTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { - t.mu.Lock() - defer t.mu.Unlock() - if _, ok := t.mu.pending[replica.ReplicaID]; ok { - return - } - for i := range t.replicas { - if t.replicas[i].ReplicaDescriptor == replica { - if i < t.idx { - t.idx-- - } - // Swap the client representing this replica to the front. - t.replicas[i], t.replicas[t.idx] = t.replicas[t.idx], t.replicas[i] - return - } - } -} - -func (t *multiTestContextKVTransport) Release() {} - -func (t *multiTestContextKVTransport) setPending(repID roachpb.ReplicaID, pending bool) { - t.mu.Lock() - defer t.mu.Unlock() - if pending { - t.mu.pending[repID] = struct{}{} - } else { - delete(t.mu.pending, repID) - } -} - -// rangeDescByAge implements sort.Interface for RangeDescriptor, sorting by the -// age of the RangeDescriptor. This is intended to find the most recent version -// of the same RangeDescriptor, when multiple versions of it are available. -type rangeDescByAge []*roachpb.RangeDescriptor - -func (rd rangeDescByAge) Len() int { return len(rd) } -func (rd rangeDescByAge) Swap(i, j int) { rd[i], rd[j] = rd[j], rd[i] } -func (rd rangeDescByAge) Less(i, j int) bool { - // "Less" means "older" according to this sort. - // A RangeDescriptor version with a higher NextReplicaID is always more recent. - if rd[i].NextReplicaID != rd[j].NextReplicaID { - return rd[i].NextReplicaID < rd[j].NextReplicaID - } - // If two RangeDescriptor versions have the same NextReplicaID, then the one - // with the fewest replicas is the newest. - return len(rd[i].InternalReplicas) > len(rd[j].InternalReplicas) -} - -// FirstRange implements the RangeDescriptorDB interface. It returns the range -// descriptor which contains roachpb.KeyMin. -// -// DistSender's implementation of FirstRange() does not work correctly because -// the gossip network used by multiTestContext is only partially operational. -func (m *multiTestContext) FirstRange() (*roachpb.RangeDescriptor, error) { - m.mu.RLock() - defer m.mu.RUnlock() - var descs []*roachpb.RangeDescriptor - for _, str := range m.senders { - // Node liveness heartbeats start quickly, sometimes before the first - // range would be available here and before we've added all ranges. - if str == nil { - continue - } - // Find every version of the RangeDescriptor for the first range by - // querying all stores; it may not be present on all stores, but the - // current version is guaranteed to be present on one of them as long - // as all stores are alive. - if err := str.VisitStores(func(s *kvserver.Store) error { - firstRng := s.LookupReplica(roachpb.RKeyMin) - if firstRng != nil { - descs = append(descs, firstRng.Desc()) - } - return nil - }); err != nil { - m.t.Fatalf("no error should be possible from this invocation of VisitStores, but found %s", err) - } - } - if len(descs) == 0 { - return nil, errors.New("first Range is not present on any live store in the multiTestContext") - } - // Sort the RangeDescriptor versions by age and return the most recent - // version. - sort.Sort(rangeDescByAge(descs)) - return descs[len(descs)-1], nil -} - -func (m *multiTestContext) makeStoreConfig(i int) kvserver.StoreConfig { - var cfg kvserver.StoreConfig - if m.storeConfig != nil { - cfg = *m.storeConfig - cfg.Clock = m.clocks[i] - } else { - cfg = kvserver.TestStoreConfig(m.clocks[i]) - m.storeConfig = &cfg - } - cfg.NodeDialer = m.nodeDialer - cfg.Transport = m.transport - cfg.Gossip = m.gossips[i] - cfg.TestingKnobs.DisableMergeQueue = true - cfg.TestingKnobs.DisableSplitQueue = true - cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true - // The mtc does not populate the allocator's store pool well and so - // the check never sees any live replicas. - cfg.TestingKnobs.AllowDangerousReplicationChanges = true - - return cfg -} - -var _ rangecache.RangeDescriptorDB = mtcRangeDescriptorDB{} - -type mtcRangeDescriptorDB struct { - *multiTestContext - ds **kvcoord.DistSender -} - -func (mrdb mtcRangeDescriptorDB) RangeLookup( - ctx context.Context, key roachpb.RKey, useReverseScan bool, -) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) { - return (*mrdb.ds).RangeLookup(ctx, key, useReverseScan) -} - -func (m *multiTestContext) populateDB(idx int, st *cluster.Settings, stopper *stop.Stopper) { - retryOpts := base.DefaultRetryOptions() - retryOpts.Closer = stopper.ShouldQuiesce() - ambient := m.storeConfig.AmbientCtx - m.distSenders[idx] = kvcoord.NewDistSender(kvcoord.DistSenderConfig{ - AmbientCtx: ambient, - Clock: m.clocks[idx], - NodeDescs: m.gossips[idx], - RPCContext: m.rpcContext, - RangeDescriptorDB: mtcRangeDescriptorDB{ - multiTestContext: m, - ds: &m.distSenders[idx], - }, - Settings: st, - TestingKnobs: kvcoord.ClientTestingKnobs{ - TransportFactory: m.kvTransportFactory, - }, - RPCRetryOptions: &retryOpts, - }) - tcsFactory := kvcoord.NewTxnCoordSenderFactory( - kvcoord.TxnCoordSenderFactoryConfig{ - AmbientCtx: ambient, - Settings: m.storeConfig.Settings, - Clock: m.clocks[idx], - Stopper: stopper, - }, - m.distSenders[idx], - ) - m.dbs[idx] = kv.NewDB(ambient, tcsFactory, m.clocks[idx], stopper) -} - -func (m *multiTestContext) populateStorePool( - idx int, cfg kvserver.StoreConfig, nodeLiveness *liveness.NodeLiveness, -) { - m.storePools[idx] = kvserver.NewStorePool( - cfg.AmbientCtx, - cfg.Settings, - m.gossips[idx], - m.clocks[idx], - nodeLiveness.GetNodeCount, - kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness), - /* deterministic */ false, - ) -} - -// AddStore creates a new store on the same Transport but doesn't create any ranges. -func (m *multiTestContext) addStore(idx int) { - var clock *hlc.Clock - if len(m.clocks) > idx { - clock = m.clocks[idx] - } else { - clock = m.storeConfig.Clock - m.clocks = append(m.clocks, clock) - } - var eng storage.Engine - var needBootstrap bool - if len(m.engines) > idx { - eng = m.engines[idx] - _, err := kvserver.ReadStoreIdent(context.Background(), eng) - if errors.HasType(err, (*kvserver.NotBootstrappedError)(nil)) { - needBootstrap = true - } else if err != nil { - m.t.Fatal(err) - } - } else { - engineStopper := stop.NewStopper() - m.engineStoppers = append(m.engineStoppers, engineStopper) - eng = storage.NewDefaultInMemForTesting() - engineStopper.AddCloser(eng) - m.engines = append(m.engines, eng) - needBootstrap = true - } - grpcServer := rpc.NewServer(m.rpcContext) - m.grpcServers[idx] = grpcServer - kvserver.RegisterMultiRaftServer(grpcServer, m.transport) - - stopper := stop.NewStopper() - - // Give this store the first store as a resolver. We don't provide all of the - // previous stores as resolvers as doing so can cause delays in bringing the - // gossip network up. - resolvers := func() []resolver.Resolver { - m.nodeIDtoAddrMu.Lock() - defer m.nodeIDtoAddrMu.Unlock() - addr := m.nodeIDtoAddrMu.nodeIDtoAddr[1] - if addr == nil { - return nil - } - r, err := resolver.NewResolverFromAddress(addr) - if err != nil { - m.t.Fatal(err) - } - return []resolver.Resolver{r} - }() - m.gossips[idx] = gossip.NewTest( - roachpb.NodeID(idx+1), - m.rpcContext, - grpcServer, - m.transportStopper, - metric.NewRegistry(), - zonepb.DefaultZoneConfigRef(), - ) - - nodeID := roachpb.NodeID(idx + 1) - cfg := m.makeStoreConfig(idx) - ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer} - ambient.AddLogTag("n", nodeID) - - m.populateDB(idx, cfg.Settings, stopper) - nlActive, nlRenewal := cfg.NodeLivenessDurations() - m.nodeLivenesses[idx] = liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ - AmbientCtx: ambient, Clock: m.clocks[idx], DB: m.dbs[idx], Gossip: m.gossips[idx], - LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings, - HistogramWindowInterval: metric.TestSampleInterval, - }) - m.populateStorePool(idx, cfg, m.nodeLivenesses[idx]) - cfg.DB = m.dbs[idx] - cfg.NodeLiveness = m.nodeLivenesses[idx] - cfg.StorePool = m.storePools[idx] - - ctx := context.Background() - if needBootstrap { - require.NoError(m.t, kvserver.WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion)) - if err := kvserver.InitEngine(ctx, eng, roachpb.StoreIdent{ - NodeID: roachpb.NodeID(idx + 1), - StoreID: roachpb.StoreID(idx + 1), - }); err != nil { - m.t.Fatal(err) - } - } - if needBootstrap && idx == 0 { - // Bootstrap the initial range on the first engine. - var splits []roachpb.RKey - kvs, tableSplits := bootstrap.MakeMetadataSchema( - keys.SystemSQLCodec, cfg.DefaultZoneConfig, cfg.DefaultSystemZoneConfig, - ).GetInitialValues() - if !m.startWithSingleRange { - splits = config.StaticSplits() - splits = append(splits, tableSplits...) - sort.Slice(splits, func(i, j int) bool { - return splits[i].Less(splits[j]) - }) - } - err := kvserver.WriteInitialClusterData( - ctx, - eng, - kvs, /* initialValues */ - clusterversion.TestingBinaryVersion, - len(m.engines), splits, cfg.Clock.PhysicalNow(), - cfg.TestingKnobs, - ) - if err != nil { - m.t.Fatal(err) - } - } - store := kvserver.NewStore(ctx, cfg, eng, &roachpb.NodeDescriptor{NodeID: nodeID}) - if err := store.Start(ctx, stopper); err != nil { - m.t.Fatal(err) - } - - sender := kvserver.NewStores(ambient, clock) - sender.AddStore(store) - server := kvserver.MakeServer(&roachpb.NodeDescriptor{NodeID: nodeID}, sender) - kvserver.RegisterPerReplicaServer(grpcServer, server) - kvserver.RegisterPerStoreServer(grpcServer, server) - - ln, err := netutil.ListenAndServeGRPC(m.transportStopper, grpcServer, util.TestAddr) - if err != nil { - m.t.Fatal(err) - } - m.nodeIDtoAddrMu.Lock() - if m.nodeIDtoAddrMu.nodeIDtoAddr == nil { - m.nodeIDtoAddrMu.nodeIDtoAddr = make(map[roachpb.NodeID]net.Addr) - } - _, ok := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] - if !ok { - m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] = ln.Addr() - } - m.nodeIDtoAddrMu.Unlock() - if ok { - m.t.Fatalf("node %d already listening", nodeID) - } - - // Add newly created objects to the multiTestContext's collections. - // (these must be populated before the store is started so that - // FirstRange() can find the sender) - m.mu.Lock() - m.stores[idx] = store - m.stoppers[idx] = stopper - m.senders[idx] = sender - // Save the store identities for later so we can use them in - // replication operations even while the store is stopped. - m.idents[idx] = *store.Ident - m.mu.Unlock() - - // NB: On Mac OS X, we sporadically see excessively long dialing times (~15s) - // which cause various trickle down badness in tests. To avoid every test - // having to worry about such conditions we pre-warm the connection - // cache. See #8440 for an example of the headaches the long dial times - // cause. - if _, err := m.rpcContext.GRPCDialNode(ln.Addr().String(), nodeID, - rpc.DefaultClass).Connect(ctx); err != nil { - m.t.Fatal(err) - } - - m.gossips[idx].Start(ln.Addr(), resolvers) - - if err := m.gossipNodeDesc(m.gossips[idx], nodeID); err != nil { - m.t.Fatal(err) - } - - ran := struct { - sync.Once - ch chan struct{} - }{ - ch: make(chan struct{}), - } - if idx != 0 { - // Given multiTestContext does not make use of the join RPC, we have to - // manually write out liveness records for each node to maintain the - // invariant that all nodes have liveness records present before they - // start heartbeating. - if err := m.nodeLivenesses[idx].CreateLivenessRecord(ctx, nodeID); err != nil { - m.t.Fatal(err) - } - } - m.nodeLivenesses[idx].Start(ctx, - liveness.NodeLivenessStartOptions{ - Stopper: stopper, - Engines: m.engines[idx : idx+1], - OnSelfLive: func(ctx context.Context) { - now := clock.Now() - if err := store.WriteLastUpTimestamp(ctx, now); err != nil { - log.Warningf(ctx, "%v", err) - } - ran.Do(func() { - close(ran.ch) - }) - }}) - - store.WaitForInit() - - // Wait until we see the first heartbeat by waiting for the callback (which - // fires *after* the node becomes live). - <-ran.ch -} - -func (m *multiTestContext) nodeDesc(nodeID roachpb.NodeID) *roachpb.NodeDescriptor { - addr := m.nodeIDtoAddrMu.nodeIDtoAddr[nodeID] - return &roachpb.NodeDescriptor{ - NodeID: nodeID, - Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()), - } -} - -// gossipNodeDesc adds the node descriptor to the gossip network. -// Mostly makes sure that we don't see a warning per request. -func (m *multiTestContext) gossipNodeDesc(g *gossip.Gossip, nodeID roachpb.NodeID) error { - return g.SetNodeDescriptor(m.nodeDesc(nodeID)) -} - -// StopStore stops a store but leaves the engine intact. -// All stopped stores must be restarted before multiTestContext.Stop is called. -func (m *multiTestContext) stopStore(i int) { - // Attempting to acquire a write lock here could lead to a situation in - // which an outstanding Raft proposal would never return due to address - // resolution calling back into the `multiTestContext` and attempting to - // acquire a read lock while this write lock is block on another read lock - // held by `SendNext` which in turn is waiting on that Raft proposal: - // - // SendNext[hold RLock] -> Raft[want RLock] - // ĘŚ / - // \ v - // stopStore[want Lock] - // - // Instead, we only acquire a read lock to fetch the stopper, and are - // careful not to hold any locks while stopping the stopper. - m.mu.RLock() - stopper := m.stoppers[i] - m.mu.RUnlock() - - stopper.Stop(context.Background()) - - m.mu.Lock() - m.stoppers[i] = nil - // Break the transport breakers for this node so that messages sent between - // a store stopping and that store restarting will never remain in-flight in - // the transport and end up reaching the store. This has been the cause of - // flakiness in the past. - m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Break() - m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Break() - m.senders[i].RemoveStore(m.stores[i]) - m.stores[i] = nil - m.mu.Unlock() -} - -// restartStore restarts a store previously stopped with StopStore. It does not -// wait for the store to successfully perform a heartbeat before returning. This -// is important for tests where a restarted store may not be able to heartbeat -// immediately. -func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { - m.mu.Lock() - stopper := stop.NewStopper() - m.stoppers[i] = stopper - cfg := m.makeStoreConfig(i) - m.populateDB(i, m.storeConfig.Settings, stopper) - nlActive, nlRenewal := cfg.NodeLivenessDurations() - m.nodeLivenesses[i] = liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ - AmbientCtx: log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, Clock: m.clocks[i], DB: m.dbs[i], - Gossip: m.gossips[i], LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings, - HistogramWindowInterval: metric.TestSampleInterval, - }) - m.populateStorePool(i, cfg, m.nodeLivenesses[i]) - cfg.DB = m.dbs[i] - cfg.NodeLiveness = m.nodeLivenesses[i] - cfg.StorePool = m.storePools[i] - ctx := context.Background() - store := kvserver.NewStore(ctx, cfg, m.engines[i], &roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)}) - m.stores[i] = store - - // Need to start the store before adding it so that the store ID is initialized. - if err := store.Start(ctx, stopper); err != nil { - m.t.Fatal(err) - } - m.senders[i].AddStore(store) - m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Reset() - m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Reset() - m.mu.Unlock() - cfg.NodeLiveness.Start(ctx, liveness.NodeLivenessStartOptions{ - Stopper: stopper, - Engines: m.engines[i : i+1], - OnSelfLive: func(ctx context.Context) { - now := m.clocks[i].Now() - if err := store.WriteLastUpTimestamp(ctx, now); err != nil { - log.Warningf(ctx, "%v", err) - } - }, - }) -} - -// restartStore restarts a store previously stopped with StopStore. -func (m *multiTestContext) restartStore(i int) { - m.restartStoreWithoutHeartbeat(i) - - // Wait until we see the first heartbeat. - liveness := m.nodeLivenesses[i] - testutils.SucceedsSoon(m.t, func() error { - if live, err := liveness.IsLive(roachpb.NodeID(i + 1)); err != nil || !live { - return errors.New("node not live") - } - return nil - }) -} - -func (m *multiTestContext) Store(i int) *kvserver.Store { - m.mu.RLock() - defer m.mu.RUnlock() - return m.stores[i] -} - -// findStartKeyLocked returns the start key of the given range. -func (m *multiTestContext) findStartKeyLocked(rangeID roachpb.RangeID) roachpb.RKey { - // We can use the first store that returns results because the start - // key never changes. - for _, s := range m.stores { - rep, err := s.GetReplica(rangeID) - if err == nil && rep.IsInitialized() { - return rep.Desc().StartKey - } - } - m.t.Fatalf("couldn't find range %s on any store", rangeID) - return nil // unreached, but the compiler can't tell. -} - -// changeReplicas performs a ChangeReplicas operation, retrying until the -// destination store has been addded or removed. Returns the range's -// NextReplicaID, which is the ID of the newly-added replica if this is an add. -func (m *multiTestContext) changeReplicas( - startKey roachpb.RKey, dest int, changeType roachpb.ReplicaChangeType, -) (roachpb.ReplicaID, error) { - ctx := context.Background() - - retryOpts := retry.Options{ - InitialBackoff: time.Millisecond, - MaxBackoff: 50 * time.Millisecond, - } - var desc roachpb.RangeDescriptor - for r := retry.Start(retryOpts); r.Next(); { - - // Perform a consistent read to get the updated range descriptor (as - // opposed to just going to one of the stores), to make sure we have - // the effects of any previous ChangeReplicas call. By the time - // ChangeReplicas returns the raft leader is guaranteed to have the - // updated version, but followers are not. - require.NoError(m.t, m.dbs[0].GetProto(ctx, keys.RangeDescriptorKey(startKey), &desc)) - - _, err := m.dbs[0].AdminChangeReplicas( - ctx, startKey.AsRawKey(), - desc, - roachpb.MakeReplicationChanges( - changeType, - roachpb.ReplicationTarget{ - NodeID: m.idents[dest].NodeID, - StoreID: m.idents[dest].StoreID, - }), - ) - - if err == nil { - break - } - - // There was an error. Refresh the range descriptor and check if we're already done. - // - // NB: this could get smarter around non-voters. Hasn't been necessary so far. - require.NoError(m.t, m.dbs[0].GetProto(ctx, keys.RangeDescriptorKey(startKey), &desc)) - if changeType == roachpb.ADD_VOTER { - if _, ok := desc.GetReplicaDescriptor(m.idents[dest].StoreID); ok { - break - } - } else if changeType == roachpb.REMOVE_VOTER { - if _, ok := desc.GetReplicaDescriptor(m.idents[dest].StoreID); !ok { - break - } - } - - if errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) { - // Try again after an AmbiguousResultError. If the operation - // succeeded, then the next attempt will return alreadyDoneErr; - // if it failed then the next attempt should succeed. - continue - } - - if kvserver.IsRetriableReplicationChangeError(err) { - log.Infof(ctx, "%v", err) - continue - } - - return 0, err - } - - return desc.NextReplicaID, nil -} - -// replicateRange replicates the given range onto the given destination stores. The destinations -// are indicated by indexes within m.stores. -func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int) { - m.t.Helper() - if err := m.replicateRangeNonFatal(rangeID, dests...); err != nil { - m.t.Fatal(err) - } -} - -// replicateRangeNonFatal replicates the given range onto the given stores. -func (m *multiTestContext) replicateRangeNonFatal(rangeID roachpb.RangeID, dests ...int) error { - m.mu.RLock() - startKey := m.findStartKeyLocked(rangeID) - m.mu.RUnlock() - - expectedReplicaIDs := make([]roachpb.ReplicaID, len(dests)) - for i, dest := range dests { - var err error - expectedReplicaIDs[i], err = m.changeReplicas(startKey, dest, roachpb.ADD_VOTER) - if err != nil { - return err - } - } - - // Wait for the replication to complete on all destination nodes. - return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { - for i, dest := range dests { - repl, err := m.stores[dest].GetReplica(rangeID) - if err != nil { - return err - } - repDesc, err := repl.GetReplicaDescriptor() - if err != nil { - return err - } - if e := expectedReplicaIDs[i]; repDesc.ReplicaID != e { - return errors.Errorf("expected replica %s to have ID %d", repl, e) - } - if t := repDesc.GetType(); t != roachpb.VOTER_FULL { - return errors.Errorf("expected replica %s to be a voter was %s", repl, t) - } - if !repl.Desc().ContainsKey(startKey) { - return errors.Errorf("expected replica %s to contain %s", repl, startKey) - } - } - return nil - }) -} - -// unreplicateRange removes a replica of the range from the dest store. -func (m *multiTestContext) unreplicateRange(rangeID roachpb.RangeID, dest int) { - m.t.Helper() - if err := m.unreplicateRangeNonFatal(rangeID, dest); err != nil { - m.t.Fatal(err) - } -} - -// unreplicateRangeNonFatal removes a replica of the range from the dest store. -// Returns an error rather than calling m.t.Fatal upon error. -func (m *multiTestContext) unreplicateRangeNonFatal(rangeID roachpb.RangeID, dest int) error { - m.mu.RLock() - startKey := m.findStartKeyLocked(rangeID) - m.mu.RUnlock() - - _, err := m.changeReplicas(startKey, dest, roachpb.REMOVE_VOTER) - return err -} - -// waitForUnreplicated waits until no replica exists for the specified range -// on the dest store. -func (m *multiTestContext) waitForUnreplicated(rangeID roachpb.RangeID, dest int) error { - // Wait for the unreplications to complete on destination node. - return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { - _, err := m.stores[dest].GetReplica(rangeID) - if err == nil { - return fmt.Errorf("replica still exists on dest %d", dest) - } else if errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)) { - return nil - } - return err - }) -} - -// readIntFromEngines reads the current integer value at the given key -// from all configured engines, filling in zeros when the value is not -// found. Returns a slice of the same length as mtc.engines. -func (m *multiTestContext) readIntFromEngines(key roachpb.Key) []int64 { - results := make([]int64, len(m.engines)) - for i, eng := range m.engines { - val, _, err := storage.MVCCGet(context.Background(), eng, key, m.clocks[i].Now(), - storage.MVCCGetOptions{}) - if err != nil { - log.VEventf(context.Background(), 1, "engine %d: error reading from key %s: %s", i, key, err) - } else if val == nil { - log.VEventf(context.Background(), 1, "engine %d: missing key %s", i, key) - } else { - results[i], err = val.GetInt() - if err != nil { - log.Errorf(context.Background(), "engine %d: error decoding %s from key %s: %+v", i, val, key, err) - } - } - } - return results -} - -// waitForValuesT is like waitForValues but allows the caller to provide a -// testing.T which may differ from m.t. -func (m *multiTestContext) waitForValuesT(t testing.TB, key roachpb.Key, expected []int64) { - t.Helper() - testutils.SucceedsSoon(t, func() error { - actual := m.readIntFromEngines(key) - if !reflect.DeepEqual(expected, actual) { - return errors.Errorf("expected %v, got %v", expected, actual) - } - return nil - }) -} - -// waitForValues waits for the integer values at the given key to match the -// expected slice (across all engines). Fails the test if they do not match -// after the SucceedsSoon period. -func (m *multiTestContext) waitForValues(key roachpb.Key, expected []int64) { - m.t.Helper() - m.waitForValuesT(m.t, key, expected) -} - -// transferLease transfers the lease for the given range from the source -// replica to the target replica. Assumes that the caller knows who the -// current leaseholder is. -func (m *multiTestContext) transferLease( - ctx context.Context, rangeID roachpb.RangeID, source int, dest int, -) { - if err := m.transferLeaseNonFatal(ctx, rangeID, source, dest); err != nil { - m.t.Fatal(err) - } -} - -// transferLease transfers the lease for the given range from the source -// replica to the target replica. Assumes that the caller knows who the -// current leaseholder is. -// Returns an error rather than calling m.t.Fatal upon error. -func (m *multiTestContext) transferLeaseNonFatal( - ctx context.Context, rangeID roachpb.RangeID, source int, dest int, -) error { - live := m.stores[dest] != nil && !m.stores[dest].IsDraining() - if !live { - return errors.Errorf("can't transfer lease to down or draining node at index %d", dest) - } - - // Heartbeat the liveness record of the destination node to make sure that the - // lease we're about to transfer can be used afterwards. Otherwise, the - // liveness record might be expired and the node is considered down, making - // this transfer irrelevant. In particular, this can happen if the clock was - // advanced recently, so all the liveness records (including the destination) - // are expired. In that case, the simple fact that the transfer succeeded - // doesn't mean that the destination now has a usable lease. - if err := m.heartbeatLiveness(ctx, dest); err != nil { - return err - } - - sourceRepl, err := m.stores[source].GetReplica(rangeID) - if err != nil { - return err - } - if err := sourceRepl.AdminTransferLease(ctx, m.idents[dest].StoreID); err != nil { - return err - } - - return nil -} - -func (m *multiTestContext) heartbeatLiveness(ctx context.Context, store int) error { - m.mu.RLock() - nl := m.nodeLivenesses[store] - m.mu.RUnlock() - l, ok := nl.Self() - if !ok { - return errors.New("liveness not found") - } - - var err error - for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); { - if err = nl.Heartbeat(ctx, l); !errors.Is(err, liveness.ErrEpochIncremented) { - break - } - } - return err -} - -// advanceClock advances the mtc's manual clock such that all -// expiration-based leases become expired. The liveness records of all the nodes -// will also become expired on the new clock value (and this will cause all the -// epoch-based leases to be considered expired until the liveness record is -// heartbeated). -// -// This method asserts that all the stores share the manual clock. Otherwise, -// the desired effect would be ambiguous. -func (m *multiTestContext) advanceClock(ctx context.Context) { - for i, clock := range m.clocks { - if clock != m.clock() { - log.Fatalf(ctx, "clock at index %d is different from the shared clock", i) - } - } - m.manualClock.Increment(m.storeConfig.LeaseExpiration()) - log.Infof(ctx, "test clock advanced to: %s", m.clock().Now()) -} - -// getRaftLeader returns the replica that is the current raft leader for the -// specified rangeID. -func (m *multiTestContext) getRaftLeader(rangeID roachpb.RangeID) *kvserver.Replica { - m.t.Helper() - var raftLeaderRepl *kvserver.Replica - testutils.SucceedsSoon(m.t, func() error { - m.mu.RLock() - defer m.mu.RUnlock() - var latestTerm uint64 - for _, store := range m.stores { - raftStatus := store.RaftStatus(rangeID) - if raftStatus == nil { - // Replica does not exist on this store or there is no raft - // status yet. - continue - } - if raftStatus.Term > latestTerm || (raftLeaderRepl == nil && raftStatus.Term == latestTerm) { - // If we find any newer term, it means any previous election is - // invalid. - raftLeaderRepl = nil - latestTerm = raftStatus.Term - if raftStatus.RaftState == raft.StateLeader { - var err error - raftLeaderRepl, err = store.GetReplica(rangeID) - if err != nil { - return err - } - } - } - } - if latestTerm == 0 || raftLeaderRepl == nil { - return errors.Errorf("could not find a raft leader for range %s", rangeID) - } - return nil - }) - return raftLeaderRepl -} - // getArgs returns a GetRequest and GetResponse pair addressed to // the default replica for the specified key. func getArgs(key roachpb.Key) *roachpb.GetRequest { @@ -1536,67 +327,6 @@ func adminTransferLeaseArgs(key roachpb.Key, target roachpb.StoreID) roachpb.Req } } -func TestSortRangeDescByAge(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - var replicaDescs []roachpb.ReplicaDescriptor - var rangeDescs []*roachpb.RangeDescriptor - - expectedReplicas := 0 - nextReplID := 1 - - // Cut a new range version with the current replica set. - newRangeVersion := func(marker string) { - currentRepls := append([]roachpb.ReplicaDescriptor(nil), replicaDescs...) - rangeDescs = append(rangeDescs, &roachpb.RangeDescriptor{ - RangeID: roachpb.RangeID(1), - InternalReplicas: currentRepls, - NextReplicaID: roachpb.ReplicaID(nextReplID), - EndKey: roachpb.RKey(marker), - }) - } - - // function to add a replica. - addReplica := func(marker string) { - replicaDescs = append(replicaDescs, roachpb.ReplicaDescriptor{ - NodeID: roachpb.NodeID(nextReplID), - StoreID: roachpb.StoreID(nextReplID), - ReplicaID: roachpb.ReplicaID(nextReplID), - }) - nextReplID++ - newRangeVersion(marker) - expectedReplicas++ - } - - // function to remove a replica. - removeReplica := func(marker string) { - remove := rand.Intn(len(replicaDescs)) - replicaDescs = append(replicaDescs[:remove], replicaDescs[remove+1:]...) - newRangeVersion(marker) - expectedReplicas-- - } - - for i := 0; i < 10; i++ { - addReplica(fmt.Sprint("added", i)) - } - for i := 0; i < 3; i++ { - removeReplica(fmt.Sprint("removed", i)) - } - addReplica("final-add") - - // randomize array - sortedRangeDescs := make([]*roachpb.RangeDescriptor, len(rangeDescs)) - for i, r := range rand.Perm(len(rangeDescs)) { - sortedRangeDescs[i] = rangeDescs[r] - } - // Sort array by age. - sort.Sort(rangeDescByAge(sortedRangeDescs)) - // Make sure both arrays are equal. - if !reflect.DeepEqual(sortedRangeDescs, rangeDescs) { - t.Fatalf("RangeDescriptor sort by age was not correct. Diff: %s", pretty.Diff(sortedRangeDescs, rangeDescs)) - } -} - func verifyRangeStats( reader storage.Reader, rangeID roachpb.RangeID, expMS enginepb.MVCCStats, ) error { @@ -1649,29 +379,3 @@ func waitForTombstone( }) return tombstone } - -// This test is here to please the unused code linter, and will be removed in -// the next commit. -func TestDummyMultiTestContext(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - cfg := kvserver.TestStoreConfig(nil) - cfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{storeConfig: &cfg} - defer mtc.Stop() - mtc.Start(t, 3) - - key := []byte("a") - mtc.getRaftLeader(1) - - incArgs := incrementArgs(key, 5) - if _, err := kv.SendWrapped(ctx, mtc.Store(0).TestSender(), incArgs); err != nil { - t.Fatal(err) - } - - mtc.waitForValues(key, []int64{5, 0, 0}) - mtc.stopStore(1) - mtc.restartStore(1) -} diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index deb964182d95..a32673dc1a39 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -216,32 +216,6 @@ func (s *Store) RequestClosedTimestamp(nodeID roachpb.NodeID, rangeID roachpb.Ra s.cfg.ClosedTimestamp.Clients.Request(nodeID, rangeID) } -// AssertInvariants verifies that the store's bookkeping is self-consistent. It -// is only valid to call this method when there is no in-flight traffic to the -// store (e.g., after the store is shut down). -func (s *Store) AssertInvariants() { - s.mu.RLock() - defer s.mu.RUnlock() - s.mu.replicas.Range(func(_ int64, p unsafe.Pointer) bool { - ctx := s.cfg.AmbientCtx.AnnotateCtx(context.Background()) - repl := (*Replica)(p) - // We would normally need to hold repl.raftMu. Otherwise we can observe an - // initialized replica that is not in s.replicasByKey, e.g., if we race with - // a goroutine that is currently initializing repl. The lock ordering makes - // acquiring repl.raftMu challenging; instead we require that this method is - // called only when there is no in-flight traffic to the store, at which - // point acquiring repl.raftMu is unnecessary. - if repl.IsInitialized() { - if rbkRepl := s.mu.replicasByKey.LookupReplica(ctx, repl.Desc().StartKey); rbkRepl != repl { - log.Fatalf(ctx, "%v misplaced in replicasByKey; found %+v instead", repl, rbkRepl) - } - } else if _, ok := s.mu.uninitReplicas[repl.RangeID]; !ok { - log.Fatalf(ctx, "%v missing from uninitReplicas", repl) - } - return true // keep iterating - }) -} - func NewTestStorePool(cfg StoreConfig) *StorePool { TimeUntilStoreDead.Override(&cfg.Settings.SV, TestTimeUntilStoreDeadOff) return NewStorePool(