diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 010f55a7ce1b..a3d264752cb0 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -12,6 +12,7 @@ package base import ( "context" + "net" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -44,6 +45,13 @@ type TestServerArgs struct { // is always set to true when the server is started via a TestCluster. PartOfCluster bool + // Listener (if nonempty) is the listener to use for all incoming RPCs. + // If a listener is installed, it informs the RPC `Addr` used below. The + // Server itself knows to close it out. This is useful for when a test wants + // manual control over how the join flags (`JoinAddr`) are populated, and + // installs listeners manually to know which addresses to point to. + Listener net.Listener + // Addr (if nonempty) is the RPC address to use for the test server. Addr string // SQLAddr (if nonempty) is the SQL address to use for the test server. diff --git a/pkg/cmd/roachtest/tpchvec.go b/pkg/cmd/roachtest/tpchvec.go index 519f92bb7927..adcdb752c8a4 100644 --- a/pkg/cmd/roachtest/tpchvec.go +++ b/pkg/cmd/roachtest/tpchvec.go @@ -331,9 +331,18 @@ func (p *tpchVecPerfTest) postTestRunHook( // "catch" the slowness). for setupIdx, setup := range runConfig.clusterSetups { performClusterSetup(t, conn, setup) + // performClusterSetup has changed the cluster settings; + // however, the session variables might contain the old values, + // so we will open up new connections for each of the setups in + // order to get the correct cluster setup on each. + tempConn := c.Conn(ctx, 1) + defer tempConn.Close() + if _, err := tempConn.Exec("USE tpch;"); err != nil { + t.Fatal(err) + } for i := 0; i < runConfig.numRunsPerQuery; i++ { t.Status(fmt.Sprintf("\nRunning EXPLAIN ANALYZE (DEBUG) for setup=%s\n", runConfig.setupNames[setupIdx])) - rows, err := conn.Query(fmt.Sprintf( + rows, err := tempConn.Query(fmt.Sprintf( "EXPLAIN ANALYZE (DEBUG) %s;", tpch.QueriesByNumber[queryNum], )) if err != nil { diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go new file mode 100644 index 000000000000..dc6fbe12fbf4 --- /dev/null +++ b/pkg/server/connectivity_test.go @@ -0,0 +1,334 @@ +// Copyright 2016 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server_test + +import ( + "context" + "fmt" + "net" + "sort" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "google.golang.org/grpc" +) + +// TestClusterConnectivity sets up an uninitialized cluster with custom join +// flags (individual nodes point to specific others, instead of all pointing to +// n1), and tests that the cluster/node IDs are distributed correctly +// throughout. +func TestClusterConnectivity(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(irfansharif): Teach TestServer to accept a list of join addresses + // instead of just one. + + var testConfigurations = []struct { + // bootstrapNode controls which node is `cockroach init`-ialized. + // Everything is 0-indexed. + bootstrapNode int + + // joinConfig[i] returns the node the i-th node is pointing to through + // its join flags. Everything is 0-indexed. + joinConfig []int + }{ + // 0. Every node points to the first, including the first. + {0, []int{0, 0, 0, 0, 0}}, + + // 1. Every node points to the previous, except the first, which points to + // itself. + // + // 0 <-- 1 <-- 2 <-- 3 <-- 4 + {0, []int{0, 0, 1, 2, 3}}, + + // 2. Same as previous, but a few links switched around. + // + // 0 <-- 2 <-- 1 <-- 3 <-- 4 + {0, []int{0, 2, 0, 1, 3}}, + + // 3. Introduce a bidirectional link. + // + // 1 <-> 2 <-- 0 <-- 3 + // 1 <-- 4 + {1, []int{2, 2, 1, 0, 1}}, + + // 4. Same as above, but bootstrap the other node in the bidirectional + // link. + // + // 1 <-> 2 <-- 0 <-- 3 + // 1 <-- 4 + {2, []int{2, 2, 1, 0, 1}}, + + // 5. Another topology centered around node 1, which itself is pointed + // to node 0. + // + // 0 <-> 1 <-- 2 + // 1 <-- 3 + {0, []int{1, 0, 1, 1}}, + + // 6. Same as above, but bootstrapping the centered node directly. + // + // 0 <-> 1 <-- 2 + // 1 <-- 3 + {1, []int{1, 0, 1, 1}}, + + // TODO(irfansharif): We would really like to be able to set up test + // clusters that are only partially connected, and assert that only + // nodes that are supposed to find out about bootstrap, actually do. + // Something like: + // + // 0 <-> 1 <-- 2 + // 5 <-- 4 <-- 3 <-- 5 + // + // A version of this was originally prototyped in #52526 but the changes + // required in Test{Cluster,Server} were too invasive to justify at the + // time. + } + + // getListener is a short hand to allocate a listener to an unbounded port. + getListener := func() net.Listener { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + return listener + } + baseServerArgs := base.TestServerArgs{ + // We're going to manually control initialization in this test. + NoAutoInitializeCluster: true, + StoreSpecs: []base.StoreSpec{{InMemory: true}}, + } + + for i, test := range testConfigurations { + t.Run(fmt.Sprintf("topology=%d", i), func(t *testing.T) { + numNodes := len(test.joinConfig) + var serverArgsPerNode = make(map[int]base.TestServerArgs) + + // We start off with installing a listener for each server. We + // pre-bind a listener so the kernel can go ahead and assign an + // address for us. We'll later use this address to populate join + // flags for neighboring nodes. + var listeners = make([]net.Listener, numNodes) + for i := 0; i < numNodes; i++ { + listener := getListener() + + serverArg := baseServerArgs + serverArg.Listener = listener + serverArg.Addr = listener.Addr().String() + serverArgsPerNode[i] = serverArg + listeners[i] = listener + } + + // We'll annotate the server args with the right join flags. + for i := 0; i < numNodes; i++ { + joinNode := test.joinConfig[i] + joinAddr := listeners[joinNode].Addr().String() + + serverArg := serverArgsPerNode[i] + serverArg.JoinAddr = joinAddr + serverArgsPerNode[i] = serverArg + } + + tcArgs := base.TestClusterArgs{ + // Saves time in this test. + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgsPerNode, + + // We have to start servers in parallel because we're looking to + // bootstrap the cluster manually in a separate thread. Each + // individual Server.Start is a blocking call (it waits for + // init). We want to start all of them in parallel to simulate a + // bunch of servers each waiting for init. + ParallelStart: true, + } + + // The test structure here is a bit convoluted, but necessary given + // the current implementation of TestCluster. TestCluster.Start + // wants to wait for all the nodes in the test cluster to be fully + // initialized before returning. Given we're testing initialization + // behavior, we do all the real work in a separate thread and keep + // the main thread limited to simply starting and stopping the test + // cluster. + // + // NB: That aside, TestCluster very much wants to live on the main + // goroutine running the test. That's mostly to do with its internal + // error handling and the limitations imposed by + // https://golang.org/pkg/testing/#T.FailNow (which sits underneath + // t.Fatal). + + tc := testcluster.NewTestCluster(t, numNodes, tcArgs) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + // Attempt to bootstrap the cluster through the configured node. + bootstrapNode := test.bootstrapNode + testutils.SucceedsSoon(t, func() (e error) { + ctx := context.Background() + serv := tc.Server(bootstrapNode) + + dialOpts, err := tc.Server(bootstrapNode).RPCContext().GRPCDialOptions() + if err != nil { + return err + } + + conn, err := grpc.DialContext(ctx, serv.ServingRPCAddr(), dialOpts...) + if err != nil { + return err + } + defer func() { + _ = conn.Close() + }() + + client := serverpb.NewInitClient(conn) + _, err = client.Bootstrap(context.Background(), &serverpb.BootstrapRequest{}) + return err + }) + + // Wait to get a real cluster ID (doesn't always get populated + // right after bootstrap). + testutils.SucceedsSoon(t, func() error { + clusterID := tc.Server(bootstrapNode).ClusterID() + if clusterID.Equal(uuid.UUID{}) { + return errors.New("cluster ID still not recorded") + } + return nil + }) + + clusterID := tc.Server(bootstrapNode).ClusterID() + testutils.SucceedsSoon(t, func() error { + var nodeIDs []roachpb.NodeID + var storeIDs []roachpb.StoreID + + // Sanity check that all the nodes we expect to join this + // network actually do (by checking they discover the right + // cluster ID). Also collect node/store IDs for below. + for i := 0; i < numNodes; i++ { + if got := tc.Server(i).ClusterID(); got != clusterID { + return errors.Newf("mismatched cluster IDs; %s (for node %d) != %s (for node %d)", + clusterID.String(), bootstrapNode, got.String(), i) + } + + nodeIDs = append(nodeIDs, tc.Server(i).NodeID()) + storeIDs = append(storeIDs, tc.Server(i).GetFirstStoreID()) + } + + sort.Slice(nodeIDs, func(i, j int) bool { + return nodeIDs[i] < nodeIDs[j] + }) + sort.Slice(storeIDs, func(i, j int) bool { + return storeIDs[i] < storeIDs[j] + }) + + // Double check that we have the full set of node/store IDs + // we expect. + for i := 1; i <= len(nodeIDs); i++ { + expNodeID := roachpb.NodeID(i) + if got := nodeIDs[i-1]; got != expNodeID { + return errors.Newf("unexpected node ID; expected %s, got %s", expNodeID.String(), got.String()) + } + + expStoreID := roachpb.StoreID(i) + if got := storeIDs[i-1]; got != expStoreID { + return errors.Newf("unexpected store ID; expected %s, got %s", expStoreID.String(), got.String()) + } + } + + return nil + }) + }() + + // Start the test cluster. This is a blocking call, and expects the + // configured number of servers in the cluster to be fully + // initialized before it returns. Given that the initialization + // happens in the other thread, we'll only get past it after having + // bootstrapped the test cluster in the thread above. + tc.Start(t) + defer tc.Stopper().Stop(context.Background()) + + wg.Wait() + }) + } +} + +// TestJoinVersionGate checks to see that improperly versioned cockroach nodes +// are not able to join a cluster. +func TestJoinVersionGate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + commonArg := base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + {InMemory: true}, + }, + } + + numNodes := 3 + tcArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Saves time in this test. + ServerArgs: commonArg, + ParallelStart: true, + } + + tc := testcluster.StartTestCluster(t, numNodes, tcArgs) + defer tc.Stopper().Stop(context.Background()) + + testutils.SucceedsSoon(t, func() error { + for i := 0; i < numNodes; i++ { + clusterID := tc.Server(0).ClusterID() + got := tc.Server(i).ClusterID() + + if got != clusterID { + return errors.Newf("mismatched cluster IDs; %s (for node %d) != %s (for node %d)", clusterID.String(), 0, got.String(), i) + } + } + return nil + }) + + var newVersion = clusterversion.TestingBinaryVersion + var oldVersion = prev(newVersion) + + knobs := base.TestingKnobs{ + Server: &server.TestingKnobs{ + BinaryVersionOverride: oldVersion, + }, + } + + oldVersionServerArgs := commonArg + oldVersionServerArgs.Knobs = knobs + oldVersionServerArgs.JoinAddr = tc.Servers[0].ServingRPCAddr() + + serv, err := tc.AddServer(oldVersionServerArgs) + if err != nil { + t.Fatal(err) + } + defer serv.Stop() + + if err := serv.Start(); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) { + t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error()) + } +} diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 70930ba426e0..16bf7f526bef 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -292,6 +292,11 @@ func (ts *TestServer) Node() interface{} { return ts.node } +// NodeID returns the ID of this node within its cluster. +func (ts *TestServer) NodeID() roachpb.NodeID { + return ts.rpcContext.NodeID.Get() +} + // Stopper returns the embedded server's Stopper. func (ts *TestServer) Stopper() *stop.Stopper { return ts.stopper diff --git a/pkg/sql/opt/partialidx/implicator.go b/pkg/sql/opt/partialidx/implicator.go index 090721d39349..fbda9407d705 100644 --- a/pkg/sql/opt/partialidx/implicator.go +++ b/pkg/sql/opt/partialidx/implicator.go @@ -145,7 +145,6 @@ func (im *Implicator) Init(f *norm.Factory, md *opt.Metadata, evalCtx *tree.Eval im.f = f im.md = md im.evalCtx = evalCtx - im.constraintCache = make(map[opt.ScalarExpr]constraintCacheItem) } // FiltersImplyPredicate attempts to prove that a partial index predicate is @@ -647,9 +646,18 @@ func (im *Implicator) twoVarComparisonImpliesTwoVarComparison( return false, true } +// initConstraintCache initializes the constraintCache field if it has not yet +// been initialized. +func (im *Implicator) initConstraintCache() { + if im.constraintCache == nil { + im.constraintCache = make(map[opt.ScalarExpr]constraintCacheItem) + } +} + // cacheConstraint caches a constraint set and a tight boolean for the given // scalar expression. func (im *Implicator) cacheConstraint(e opt.ScalarExpr, c *constraint.Set, tight bool) { + im.initConstraintCache() if _, ok := im.constraintCache[e]; !ok { im.constraintCache[e] = constraintCacheItem{ c: c, @@ -662,6 +670,7 @@ func (im *Implicator) cacheConstraint(e opt.ScalarExpr, c *constraint.Set, tight // cache contains an entry for the given scalar expression. It returns // ok = false if the scalar expression does not exist in the cache. func (im *Implicator) fetchConstraint(e opt.ScalarExpr) (_ *constraint.Set, tight bool, ok bool) { + im.initConstraintCache() if res, ok := im.constraintCache[e]; ok { return res.c, res.tight, true } diff --git a/pkg/sql/opt/xform/optimizer.go b/pkg/sql/opt/xform/optimizer.go index 1f25de55acc7..1ce78755c03c 100644 --- a/pkg/sql/opt/xform/optimizer.go +++ b/pkg/sql/opt/xform/optimizer.go @@ -95,7 +95,7 @@ type Optimizer struct { disabledRules RuleSet // JoinOrderBuilder adds new join orderings to the memo. - jb *JoinOrderBuilder + jb JoinOrderBuilder } // Init initializes the Optimizer with a new, blank memo structure inside. This @@ -112,7 +112,7 @@ func (o *Optimizer) Init(evalCtx *tree.EvalContext, catalog cat.Catalog) { o.matchedRule = nil o.appliedRule = nil o.disabledRules = util.FastIntSet{} - o.jb = &JoinOrderBuilder{} + o.jb = JoinOrderBuilder{} if evalCtx.TestingKnobs.DisableOptimizerRuleProbability > 0 { o.disableRules(evalCtx.TestingKnobs.DisableOptimizerRuleProbability) } @@ -151,7 +151,7 @@ func (o *Optimizer) SetCoster(coster Coster) { // JoinOrderBuilder returns the JoinOrderBuilder instance that the optimizer is // currently using to reorder join trees. func (o *Optimizer) JoinOrderBuilder() *JoinOrderBuilder { - return o.jb + return &o.jb } // DisableOptimizations disables all transformation rules, including normalize diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index d8a599c66593..6b3ab1ec436d 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -30,7 +30,7 @@ import ( type TestClusterInterface interface { // Start is used to start up the servers that were instantiated when // creating this cluster. - Start(t testing.TB, args base.TestClusterArgs) + Start(t testing.TB) // NumServers returns the number of servers this test cluster is configured // with. @@ -144,7 +144,7 @@ func StartNewTestCluster( t testing.TB, numNodes int, args base.TestClusterArgs, ) TestClusterInterface { cluster := NewTestCluster(t, numNodes, args) - cluster.Start(t, args) + cluster.Start(t) return cluster } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 39edd305316d..87eaad152d84 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // TestServerInterface defines test server functionality that tests need; it is @@ -50,6 +51,10 @@ type TestServerInterface interface { // NodeID returns the ID of this node within its cluster. NodeID() roachpb.NodeID + // ClusterID returns the cluster ID as understood by this node in the + // cluster. + ClusterID() uuid.UUID + // ServingRPCAddr returns the server's advertised address. ServingRPCAddr() string diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 7de3b28c9bd6..0d850ecd61d6 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -44,18 +44,19 @@ import ( ) // TestCluster represents a set of TestServers. The hope is that it can be used -// analoguous to TestServer, but with control over range replication. +// analogous to TestServer, but with control over range replication and join +// flags. type TestCluster struct { Servers []*server.TestServer Conns []*gosql.DB stopper *stop.Stopper - replicationMode base.TestClusterReplicationMode scratchRangeKey roachpb.Key mu struct { syncutil.Mutex serverStoppers []*stop.Stopper } - serverArgs []base.TestServerArgs + serverArgs []base.TestServerArgs + clusterArgs base.TestClusterArgs } var _ serverutils.TestClusterInterface = &TestCluster{} @@ -126,61 +127,54 @@ func (tc *TestCluster) StopServer(idx int) { // The cluster should be stopped using TestCluster.Stopper().Stop(). func StartTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestCluster { cluster := NewTestCluster(t, nodes, args) - cluster.Start(t, args) + cluster.Start(t) return cluster } // NewTestCluster initializes a TestCluster made up of `nodes` in-memory testing // servers. It needs to be started separately using the return type. -func NewTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestCluster { +func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) *TestCluster { if nodes < 1 { t.Fatal("invalid cluster size: ", nodes) } if err := checkServerArgsForCluster( - args.ServerArgs, args.ReplicationMode, disallowJoinAddr, + clusterArgs.ServerArgs, clusterArgs.ReplicationMode, disallowJoinAddr, ); err != nil { t.Fatal(err) } - for _, sargs := range args.ServerArgsPerNode { + for _, sargs := range clusterArgs.ServerArgsPerNode { if err := checkServerArgsForCluster( - sargs, args.ReplicationMode, disallowJoinAddr, + sargs, clusterArgs.ReplicationMode, allowJoinAddr, ); err != nil { t.Fatal(err) } } tc := &TestCluster{ - stopper: stop.NewStopper(), - replicationMode: args.ReplicationMode, + stopper: stop.NewStopper(), + clusterArgs: clusterArgs, } // Check if any of the args have a locality set. noLocalities := true - for _, arg := range args.ServerArgsPerNode { + for _, arg := range tc.clusterArgs.ServerArgsPerNode { if len(arg.Locality.Tiers) > 0 { noLocalities = false break } } - if len(args.ServerArgs.Locality.Tiers) > 0 { + if len(tc.clusterArgs.ServerArgs.Locality.Tiers) > 0 { noLocalities = false } - // Pre-bind a listener for node zero so the kernel can go ahead and - // assign its address for use in the other nodes' join flags. - // The Server becomes responsible for closing this. - firstListener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - + var firstListener net.Listener for i := 0; i < nodes; i++ { var serverArgs base.TestServerArgs - if perNodeServerArgs, ok := args.ServerArgsPerNode[i]; ok { + if perNodeServerArgs, ok := tc.clusterArgs.ServerArgsPerNode[i]; ok { serverArgs = perNodeServerArgs } else { - serverArgs = args.ServerArgs + serverArgs = tc.clusterArgs.ServerArgs } // If no localities are specified in the args, we'll generate some @@ -194,18 +188,25 @@ func NewTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestClu } if i == 0 { - if serverArgs.Knobs.Server == nil { - serverArgs.Knobs.Server = &server.TestingKnobs{} + if serverArgs.Listener != nil { + // If the test installed a listener for us, use that. + firstListener = serverArgs.Listener } else { - // Copy the knobs so the struct with the listener is not - // reused for other nodes. - knobs := *serverArgs.Knobs.Server.(*server.TestingKnobs) - serverArgs.Knobs.Server = &knobs + // Pre-bind a listener for node zero so the kernel can go ahead and + // assign its address for use in the other nodes' join flags. + // The Server becomes responsible for closing this. + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + firstListener = listener + serverArgs.Listener = listener } - serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = firstListener - serverArgs.Addr = firstListener.Addr().String() } else { - serverArgs.JoinAddr = firstListener.Addr().String() + if serverArgs.JoinAddr == "" { + // Point to the first listener unless told explicitly otherwise. + serverArgs.JoinAddr = firstListener.Addr().String() + } serverArgs.NoAutoInitializeCluster = true } @@ -224,10 +225,10 @@ func NewTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestClu // If looking to test initialization/bootstrap behavior, Start should be invoked // in a separate thread and with ParallelStart enabled (otherwise it'll block // on waiting for init for the first server). -func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { +func (tc *TestCluster) Start(t testing.TB) { nodes := len(tc.Servers) var errCh chan error - if args.ParallelStart { + if tc.clusterArgs.ParallelStart { errCh = make(chan error, nodes) } @@ -238,7 +239,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { disableLBS = true } - if args.ParallelStart { + if tc.clusterArgs.ParallelStart { go func(i int) { errCh <- tc.StartServer(t, tc.Server(i), tc.serverArgs[i]) }(i) @@ -253,7 +254,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { } } - if args.ParallelStart { + if tc.clusterArgs.ParallelStart { for i := 0; i < nodes; i++ { if err := <-errCh; err != nil { t.Fatal(err) @@ -263,7 +264,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { tc.WaitForNStores(t, tc.NumServers(), tc.Servers[0].Gossip()) } - if tc.replicationMode == base.ReplicationManual { + if tc.clusterArgs.ReplicationMode == base.ReplicationManual { // We've already disabled the merge queue via testing knobs above, but ALTER // TABLE ... SPLIT AT will throw an error unless we also disable merges via // the cluster setting. @@ -285,7 +286,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { // cluster stopper is stopped. tc.stopper.AddCloser(stop.CloserFn(tc.stopServers)) - if tc.replicationMode == base.ReplicationAuto { + if tc.clusterArgs.ReplicationMode == base.ReplicationAuto { if err := tc.WaitForFullReplication(); err != nil { t.Fatal(err) } @@ -359,7 +360,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe // started, in which case the check has not been performed. if err := checkServerArgsForCluster( serverArgs, - tc.replicationMode, + tc.clusterArgs.ReplicationMode, // Allow JoinAddr here; servers being added after the TestCluster has been // started should have a JoinAddr filled in at this point. allowJoinAddr, @@ -367,7 +368,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe return nil, err } serverArgs.Stopper = stop.NewStopper() - if tc.replicationMode == base.ReplicationManual { + if tc.clusterArgs.ReplicationMode == base.ReplicationManual { var stkCopy kvserver.StoreTestingKnobs if stk := serverArgs.Knobs.Store; stk != nil { stkCopy = *stk.(*kvserver.StoreTestingKnobs) @@ -378,6 +379,23 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe serverArgs.Knobs.Store = &stkCopy } + // Install listener, if non-empty. + if serverArgs.Listener != nil { + // Instantiate the server testing knobs if non-empty. + if serverArgs.Knobs.Server == nil { + serverArgs.Knobs.Server = &server.TestingKnobs{} + } else { + // Copy the knobs so the struct with the listener is not + // reused for other nodes. + knobs := *serverArgs.Knobs.Server.(*server.TestingKnobs) + serverArgs.Knobs.Server = &knobs + } + + // Install the provided listener. + serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = serverArgs.Listener + serverArgs.Addr = serverArgs.Listener.Addr().String() + } + s := serverutils.NewServer(serverArgs).(*server.TestServer) tc.Servers = append(tc.Servers, s) @@ -917,7 +935,7 @@ func (tc *TestCluster) WaitForNodeLiveness(t testing.TB) { // ReplicationMode implements TestClusterInterface. func (tc *TestCluster) ReplicationMode() base.TestClusterReplicationMode { - return tc.replicationMode + return tc.clusterArgs.ReplicationMode } // ToggleReplicateQueues activates or deactivates the replication queues on all