From 1e9faf217bea904c35d596c5c321ed9a94c573cc Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 4 Sep 2020 17:12:03 -0400 Subject: [PATCH 1/7] testutils: fold replicationMode into TestClusterArgs And no need require callers re-supply it when starting up the cluster (we already need it when we create it). Just a drive by cleanup. Release justification: non-production code changes Release note: None --- .../serverutils/test_cluster_shim.go | 4 +- pkg/testutils/testcluster/testcluster.go | 46 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index d8a599c66593..6b3ab1ec436d 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -30,7 +30,7 @@ import ( type TestClusterInterface interface { // Start is used to start up the servers that were instantiated when // creating this cluster. - Start(t testing.TB, args base.TestClusterArgs) + Start(t testing.TB) // NumServers returns the number of servers this test cluster is configured // with. @@ -144,7 +144,7 @@ func StartNewTestCluster( t testing.TB, numNodes int, args base.TestClusterArgs, ) TestClusterInterface { cluster := NewTestCluster(t, numNodes, args) - cluster.Start(t, args) + cluster.Start(t) return cluster } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 7de3b28c9bd6..3a93f838f0e3 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -44,18 +44,18 @@ import ( ) // TestCluster represents a set of TestServers. The hope is that it can be used -// analoguous to TestServer, but with control over range replication. +// analogous to TestServer, but with control over range replication. type TestCluster struct { Servers []*server.TestServer Conns []*gosql.DB stopper *stop.Stopper - replicationMode base.TestClusterReplicationMode scratchRangeKey roachpb.Key mu struct { syncutil.Mutex serverStoppers []*stop.Stopper } - serverArgs []base.TestServerArgs + serverArgs []base.TestServerArgs + clusterArgs base.TestClusterArgs } var _ serverutils.TestClusterInterface = &TestCluster{} @@ -126,44 +126,44 @@ func (tc *TestCluster) StopServer(idx int) { // The cluster should be stopped using TestCluster.Stopper().Stop(). func StartTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestCluster { cluster := NewTestCluster(t, nodes, args) - cluster.Start(t, args) + cluster.Start(t) return cluster } // NewTestCluster initializes a TestCluster made up of `nodes` in-memory testing // servers. It needs to be started separately using the return type. -func NewTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestCluster { +func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) *TestCluster { if nodes < 1 { t.Fatal("invalid cluster size: ", nodes) } if err := checkServerArgsForCluster( - args.ServerArgs, args.ReplicationMode, disallowJoinAddr, + clusterArgs.ServerArgs, clusterArgs.ReplicationMode, disallowJoinAddr, ); err != nil { t.Fatal(err) } - for _, sargs := range args.ServerArgsPerNode { + for _, sargs := range clusterArgs.ServerArgsPerNode { if err := checkServerArgsForCluster( - sargs, args.ReplicationMode, disallowJoinAddr, + sargs, clusterArgs.ReplicationMode, allowJoinAddr, ); err != nil { t.Fatal(err) } } tc := &TestCluster{ - stopper: stop.NewStopper(), - replicationMode: args.ReplicationMode, + stopper: stop.NewStopper(), + clusterArgs: clusterArgs, } // Check if any of the args have a locality set. noLocalities := true - for _, arg := range args.ServerArgsPerNode { + for _, arg := range tc.clusterArgs.ServerArgsPerNode { if len(arg.Locality.Tiers) > 0 { noLocalities = false break } } - if len(args.ServerArgs.Locality.Tiers) > 0 { + if len(tc.clusterArgs.ServerArgs.Locality.Tiers) > 0 { noLocalities = false } @@ -177,10 +177,10 @@ func NewTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestClu for i := 0; i < nodes; i++ { var serverArgs base.TestServerArgs - if perNodeServerArgs, ok := args.ServerArgsPerNode[i]; ok { + if perNodeServerArgs, ok := tc.clusterArgs.ServerArgsPerNode[i]; ok { serverArgs = perNodeServerArgs } else { - serverArgs = args.ServerArgs + serverArgs = tc.clusterArgs.ServerArgs } // If no localities are specified in the args, we'll generate some @@ -224,10 +224,10 @@ func NewTestCluster(t testing.TB, nodes int, args base.TestClusterArgs) *TestClu // If looking to test initialization/bootstrap behavior, Start should be invoked // in a separate thread and with ParallelStart enabled (otherwise it'll block // on waiting for init for the first server). -func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { +func (tc *TestCluster) Start(t testing.TB) { nodes := len(tc.Servers) var errCh chan error - if args.ParallelStart { + if tc.clusterArgs.ParallelStart { errCh = make(chan error, nodes) } @@ -238,7 +238,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { disableLBS = true } - if args.ParallelStart { + if tc.clusterArgs.ParallelStart { go func(i int) { errCh <- tc.StartServer(t, tc.Server(i), tc.serverArgs[i]) }(i) @@ -253,7 +253,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { } } - if args.ParallelStart { + if tc.clusterArgs.ParallelStart { for i := 0; i < nodes; i++ { if err := <-errCh; err != nil { t.Fatal(err) @@ -263,7 +263,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { tc.WaitForNStores(t, tc.NumServers(), tc.Servers[0].Gossip()) } - if tc.replicationMode == base.ReplicationManual { + if tc.clusterArgs.ReplicationMode == base.ReplicationManual { // We've already disabled the merge queue via testing knobs above, but ALTER // TABLE ... SPLIT AT will throw an error unless we also disable merges via // the cluster setting. @@ -285,7 +285,7 @@ func (tc *TestCluster) Start(t testing.TB, args base.TestClusterArgs) { // cluster stopper is stopped. tc.stopper.AddCloser(stop.CloserFn(tc.stopServers)) - if tc.replicationMode == base.ReplicationAuto { + if tc.clusterArgs.ReplicationMode == base.ReplicationAuto { if err := tc.WaitForFullReplication(); err != nil { t.Fatal(err) } @@ -359,7 +359,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe // started, in which case the check has not been performed. if err := checkServerArgsForCluster( serverArgs, - tc.replicationMode, + tc.clusterArgs.ReplicationMode, // Allow JoinAddr here; servers being added after the TestCluster has been // started should have a JoinAddr filled in at this point. allowJoinAddr, @@ -367,7 +367,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe return nil, err } serverArgs.Stopper = stop.NewStopper() - if tc.replicationMode == base.ReplicationManual { + if tc.clusterArgs.ReplicationMode == base.ReplicationManual { var stkCopy kvserver.StoreTestingKnobs if stk := serverArgs.Knobs.Store; stk != nil { stkCopy = *stk.(*kvserver.StoreTestingKnobs) @@ -917,7 +917,7 @@ func (tc *TestCluster) WaitForNodeLiveness(t testing.TB) { // ReplicationMode implements TestClusterInterface. func (tc *TestCluster) ReplicationMode() base.TestClusterReplicationMode { - return tc.replicationMode + return tc.clusterArgs.ReplicationMode } // ToggleReplicateQueues activates or deactivates the replication queues on all From c36463e5b071ae5ada393454f7b310da43924ac2 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 4 Sep 2020 17:44:28 -0400 Subject: [PATCH 2/7] base,testcluster: allow installation of Listeners in TestCluster We pre-define a firstListener when creating a TestCluster in order to have the address accessible to other connecting servers that want to set up appropriate join flags. We generalize this idea to allow callers to install Listeners on a per server basis in order to set up arbitrary join flag "links" between nodes. We'll make use of this ability in a future test. Release note: None --- pkg/base/test_server_args.go | 8 ++++ pkg/testutils/testcluster/testcluster.go | 54 ++++++++++++++++-------- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 010f55a7ce1b..a3d264752cb0 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -12,6 +12,7 @@ package base import ( "context" + "net" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -44,6 +45,13 @@ type TestServerArgs struct { // is always set to true when the server is started via a TestCluster. PartOfCluster bool + // Listener (if nonempty) is the listener to use for all incoming RPCs. + // If a listener is installed, it informs the RPC `Addr` used below. The + // Server itself knows to close it out. This is useful for when a test wants + // manual control over how the join flags (`JoinAddr`) are populated, and + // installs listeners manually to know which addresses to point to. + Listener net.Listener + // Addr (if nonempty) is the RPC address to use for the test server. Addr string // SQLAddr (if nonempty) is the SQL address to use for the test server. diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 3a93f838f0e3..0d850ecd61d6 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -44,7 +44,8 @@ import ( ) // TestCluster represents a set of TestServers. The hope is that it can be used -// analogous to TestServer, but with control over range replication. +// analogous to TestServer, but with control over range replication and join +// flags. type TestCluster struct { Servers []*server.TestServer Conns []*gosql.DB @@ -167,14 +168,7 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) * noLocalities = false } - // Pre-bind a listener for node zero so the kernel can go ahead and - // assign its address for use in the other nodes' join flags. - // The Server becomes responsible for closing this. - firstListener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - + var firstListener net.Listener for i := 0; i < nodes; i++ { var serverArgs base.TestServerArgs if perNodeServerArgs, ok := tc.clusterArgs.ServerArgsPerNode[i]; ok { @@ -194,18 +188,25 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) * } if i == 0 { - if serverArgs.Knobs.Server == nil { - serverArgs.Knobs.Server = &server.TestingKnobs{} + if serverArgs.Listener != nil { + // If the test installed a listener for us, use that. + firstListener = serverArgs.Listener } else { - // Copy the knobs so the struct with the listener is not - // reused for other nodes. - knobs := *serverArgs.Knobs.Server.(*server.TestingKnobs) - serverArgs.Knobs.Server = &knobs + // Pre-bind a listener for node zero so the kernel can go ahead and + // assign its address for use in the other nodes' join flags. + // The Server becomes responsible for closing this. + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + firstListener = listener + serverArgs.Listener = listener } - serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = firstListener - serverArgs.Addr = firstListener.Addr().String() } else { - serverArgs.JoinAddr = firstListener.Addr().String() + if serverArgs.JoinAddr == "" { + // Point to the first listener unless told explicitly otherwise. + serverArgs.JoinAddr = firstListener.Addr().String() + } serverArgs.NoAutoInitializeCluster = true } @@ -378,6 +379,23 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe serverArgs.Knobs.Store = &stkCopy } + // Install listener, if non-empty. + if serverArgs.Listener != nil { + // Instantiate the server testing knobs if non-empty. + if serverArgs.Knobs.Server == nil { + serverArgs.Knobs.Server = &server.TestingKnobs{} + } else { + // Copy the knobs so the struct with the listener is not + // reused for other nodes. + knobs := *serverArgs.Knobs.Server.(*server.TestingKnobs) + serverArgs.Knobs.Server = &knobs + } + + // Install the provided listener. + serverArgs.Knobs.Server.(*server.TestingKnobs).RPCListener = serverArgs.Listener + serverArgs.Addr = serverArgs.Listener.Addr().String() + } + s := serverutils.NewServer(serverArgs).(*server.TestServer) tc.Servers = append(tc.Servers, s) From 5644528d12ffb54bbde40ec756b138870cd660db Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Wed, 9 Sep 2020 11:48:15 -0400 Subject: [PATCH 3/7] server: fetch TestServer node ID from rpcContext instead In a future commit we'll add a test that wants to read the node ID of a given server concurrently with it being set. This is because the test exercises initialization code paths where we'll start a test server in parallel with inspecting its state. If we consult the descriptor when retrieving the nodee ID, it's a data race (there's no mutex protecting its access, it was never needed). The rpcContext node ID container however is an atomic value, so it's safe for concurrent access. They're both set at the same points in the server start up codepaths, so it's safe to swap one our for another. Release note: None --- pkg/server/testserver.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 3d41f08ac934..b73c81687dd6 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -292,6 +292,11 @@ func (ts *TestServer) Node() interface{} { return ts.node } +// NodeID returns the ID of this node within its cluster. +func (ts *TestServer) NodeID() roachpb.NodeID { + return ts.rpcContext.NodeID.Get() +} + // Stopper returns the embedded server's Stopper. func (ts *TestServer) Stopper() *stop.Stopper { return ts.stopper From 45a277ba6c8ec37245949d95f0a719d542589df5 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 9 Sep 2020 18:07:21 -0700 Subject: [PATCH 4/7] partialidx: lazily initialize the implicator constraint cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit makes the constructing of the Implicator's constrain cache map lazy. Instead of making the map in the `Init` method, it is made only when it is needed. This fixes performance regressions in microbenchmarks. name old time/op new time/op delta Phases/kv-read-const/Normalize-16 170ns ± 0% 139ns ± 0% -18.66% (p=0.008 n=5+5) Phases/kv-read-const/OptBuild-16 173ns ± 1% 141ns ± 1% -18.38% (p=0.008 n=5+5) Phases/kv-read-const/Explore-16 183ns ± 1% 151ns ± 0% -17.70% (p=0.008 n=5+5) Phases/kv-read-const/ExecBuild-16 674ns ± 2% 636ns ± 1% -5.58% (p=0.008 n=5+5) Phases/kv-read/ExecBuild-16 13.3µs ± 1% 13.1µs ± 1% -1.78% (p=0.008 n=5+5) Phases/tpcc-delivery/ExecBuild-16 29.4µs ± 1% 28.9µs ± 0% -1.64% (p=0.016 n=5+4) Phases/tpcc-stock-level/Explore-16 153µs ± 1% 151µs ± 1% -1.24% (p=0.008 n=5+5) Phases/kv-read-no-prep/Explore-16 42.3µs ± 1% 41.9µs ± 0% -0.85% (p=0.008 n=5+5) Phases/tpcc-delivery/OptBuild-16 12.1µs ± 1% 12.1µs ± 0% -0.65% (p=0.032 n=5+5) Phases/kv-read/Parse-16 2.49ns ± 1% 2.48ns ± 0% ~ (p=0.413 n=5+4) Phases/kv-read/OptBuild-16 6.46µs ± 2% 6.44µs ± 1% ~ (p=0.841 n=5+5) Phases/kv-read/Normalize-16 6.48µs ± 3% 6.48µs ± 2% ~ (p=1.000 n=5+5) Phases/kv-read/Explore-16 12.1µs ± 1% 11.9µs ± 2% ~ (p=0.056 n=5+5) Phases/kv-read-no-prep/OptBuild-16 32.4µs ± 3% 32.2µs ± 1% ~ (p=1.000 n=5+5) Phases/kv-read-no-prep/Normalize-16 34.9µs ± 1% 34.7µs ± 2% ~ (p=0.222 n=5+5) Phases/kv-read-no-prep/ExecBuild-16 44.0µs ± 3% 43.7µs ± 1% ~ (p=0.310 n=5+5) Phases/kv-read-const/Parse-16 2.60ns ± 1% 2.60ns ± 1% ~ (p=0.643 n=5+5) Phases/tpcc-new-order/Normalize-16 12.8µs ± 1% 13.9µs ±10% ~ (p=0.690 n=5+5) Phases/tpcc-new-order/ExecBuild-16 38.5µs ± 1% 38.6µs ± 2% ~ (p=1.000 n=5+5) Phases/tpcc-delivery/Parse-16 2.55ns ± 1% 2.56ns ± 1% ~ (p=0.921 n=5+5) Phases/tpcc-delivery/Normalize-16 12.6µs ± 2% 12.5µs ± 1% ~ (p=0.151 n=5+5) Phases/tpcc-delivery/Explore-16 28.0µs ± 2% 27.6µs ± 1% ~ (p=0.063 n=5+5) Phases/tpcc-stock-level/OptBuild-16 52.8µs ± 1% 52.9µs ± 1% ~ (p=1.000 n=5+5) Phases/tpcc-stock-level/Normalize-16 53.7µs ± 1% 53.7µs ± 1% ~ (p=1.000 n=5+5) Phases/tpcc-stock-level/ExecBuild-16 160µs ± 1% 161µs ± 1% ~ (p=0.222 n=5+5) Phases/tpcc-new-order/Explore-16 36.2µs ± 0% 36.7µs ± 1% +1.47% (p=0.016 n=4+5) Phases/kv-read-no-prep/Parse-16 12.6µs ± 1% 12.8µs ± 1% +1.58% (p=0.016 n=4+5) Phases/tpcc-new-order/Parse-16 2.58ns ± 1% 2.67ns ± 7% +3.33% (p=0.048 n=5+5) Phases/tpcc-stock-level/Parse-16 2.52ns ± 1% 2.63ns ± 4% +4.29% (p=0.016 n=5+5) Phases/tpcc-new-order/OptBuild-16 13.0µs ± 1% 14.7µs ± 2% +12.50% (p=0.008 n=5+5) Release note: None --- pkg/sql/opt/partialidx/implicator.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/sql/opt/partialidx/implicator.go b/pkg/sql/opt/partialidx/implicator.go index 090721d39349..fbda9407d705 100644 --- a/pkg/sql/opt/partialidx/implicator.go +++ b/pkg/sql/opt/partialidx/implicator.go @@ -145,7 +145,6 @@ func (im *Implicator) Init(f *norm.Factory, md *opt.Metadata, evalCtx *tree.Eval im.f = f im.md = md im.evalCtx = evalCtx - im.constraintCache = make(map[opt.ScalarExpr]constraintCacheItem) } // FiltersImplyPredicate attempts to prove that a partial index predicate is @@ -647,9 +646,18 @@ func (im *Implicator) twoVarComparisonImpliesTwoVarComparison( return false, true } +// initConstraintCache initializes the constraintCache field if it has not yet +// been initialized. +func (im *Implicator) initConstraintCache() { + if im.constraintCache == nil { + im.constraintCache = make(map[opt.ScalarExpr]constraintCacheItem) + } +} + // cacheConstraint caches a constraint set and a tight boolean for the given // scalar expression. func (im *Implicator) cacheConstraint(e opt.ScalarExpr, c *constraint.Set, tight bool) { + im.initConstraintCache() if _, ok := im.constraintCache[e]; !ok { im.constraintCache[e] = constraintCacheItem{ c: c, @@ -662,6 +670,7 @@ func (im *Implicator) cacheConstraint(e opt.ScalarExpr, c *constraint.Set, tight // cache contains an entry for the given scalar expression. It returns // ok = false if the scalar expression does not exist in the cache. func (im *Implicator) fetchConstraint(e opt.ScalarExpr) (_ *constraint.Set, tight bool, ok bool) { + im.initConstraintCache() if res, ok := im.constraintCache[e]; ok { return res.c, res.tight, true } From 53e9ebf7c723d9e3074d8c1e26801f36cd7952dc Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 9 Sep 2020 12:21:06 -0600 Subject: [PATCH 5/7] roachtest: fix tpchvec/perf in some cases Previously, whenever a slowness threshold is exceeded, we would run EXPLAIN ANALYZE (DEBUG) on the query at fault using the same connection for all cluster setups. This would result in running the query only with vectorize=on which is not what we want. We will now be opening up new connections to the database after the cluster settings have been updated in order to get the correct behavior for each of the setups. Release note: None --- pkg/cmd/roachtest/tpchvec.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tpchvec.go b/pkg/cmd/roachtest/tpchvec.go index 519f92bb7927..adcdb752c8a4 100644 --- a/pkg/cmd/roachtest/tpchvec.go +++ b/pkg/cmd/roachtest/tpchvec.go @@ -331,9 +331,18 @@ func (p *tpchVecPerfTest) postTestRunHook( // "catch" the slowness). for setupIdx, setup := range runConfig.clusterSetups { performClusterSetup(t, conn, setup) + // performClusterSetup has changed the cluster settings; + // however, the session variables might contain the old values, + // so we will open up new connections for each of the setups in + // order to get the correct cluster setup on each. + tempConn := c.Conn(ctx, 1) + defer tempConn.Close() + if _, err := tempConn.Exec("USE tpch;"); err != nil { + t.Fatal(err) + } for i := 0; i < runConfig.numRunsPerQuery; i++ { t.Status(fmt.Sprintf("\nRunning EXPLAIN ANALYZE (DEBUG) for setup=%s\n", runConfig.setupNames[setupIdx])) - rows, err := conn.Query(fmt.Sprintf( + rows, err := tempConn.Query(fmt.Sprintf( "EXPLAIN ANALYZE (DEBUG) %s;", tpch.QueriesByNumber[queryNum], )) if err != nil { From 69496970545a4299dee0d40e5df64163e76def7c Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Thu, 10 Sep 2020 09:30:30 -0700 Subject: [PATCH 6/7] opt: embed JoinOrderBuilder in Optimizer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit embeds the JoinOrderBuilder in the Optimizer struct. This avoids allocating a new JoinOrderBuilder on the heap every time `Optimizer.Init` is called. BenchmarkPhase microbenchmarks comparing the current master branch to this commit: name old time/op new time/op delta Phases/kv-read-const/Normalize-16 171ns ± 1% 77ns ± 1% -55.11% (p=0.008 n=5+5) Phases/kv-read-const/OptBuild-16 175ns ± 1% 79ns ± 2% -54.83% (p=0.008 n=5+5) Phases/kv-read-const/Explore-16 189ns ± 7% 89ns ± 0% -53.14% (p=0.008 n=5+5) Phases/tpcc-new-order/OptBuild-16 16.6µs ± 6% 12.7µs ± 2% -23.47% (p=0.008 n=5+5) Phases/tpcc-new-order/Normalize-16 14.7µs ± 8% 12.4µs ± 1% -15.39% (p=0.008 n=5+5) Phases/kv-read-const/ExecBuild-16 687ns ± 4% 592ns ± 1% -13.89% (p=0.008 n=5+5) Phases/tpcc-new-order/Parse-16 2.94ns ± 8% 2.58ns ± 2% -12.23% (p=0.008 n=5+5) Phases/tpcc-delivery/Parse-16 2.86ns ± 8% 2.54ns ± 1% -11.33% (p=0.008 n=5+5) Phases/tpcc-new-order/ExecBuild-16 43.0µs ±10% 38.6µs ± 0% -10.21% (p=0.016 n=5+4) Phases/tpcc-new-order/Explore-16 39.0µs ± 3% 36.6µs ± 0% -6.12% (p=0.008 n=5+5) Phases/tpcc-delivery/OptBuild-16 12.4µs ± 1% 12.0µs ± 0% -3.49% (p=0.008 n=5+5) Phases/kv-read/ExecBuild-16 13.4µs ± 0% 12.9µs ± 0% -3.37% (p=0.008 n=5+5) Phases/kv-read/Explore-16 12.1µs ± 1% 11.7µs ± 2% -3.04% (p=0.008 n=5+5) Phases/tpcc-delivery/Normalize-16 12.8µs ± 1% 12.4µs ± 1% -3.00% (p=0.008 n=5+5) Phases/kv-read-no-prep/ExecBuild-16 44.1µs ± 2% 43.2µs ± 1% -2.16% (p=0.008 n=5+5) Phases/kv-read/Normalize-16 6.48µs ± 1% 6.36µs ± 1% -1.73% (p=0.008 n=5+5) Phases/kv-read-no-prep/Explore-16 42.6µs ± 1% 41.9µs ± 1% -1.55% (p=0.032 n=5+5) Phases/tpcc-delivery/ExecBuild-16 29.5µs ± 1% 29.1µs ± 0% -1.41% (p=0.008 n=5+5) Phases/kv-read-no-prep/OptBuild-16 32.2µs ± 0% 31.7µs ± 1% -1.34% (p=0.008 n=5+5) Phases/kv-read-no-prep/Normalize-16 34.8µs ± 1% 34.4µs ± 1% -1.25% (p=0.016 n=5+4) Phases/tpcc-stock-level/OptBuild-16 53.1µs ± 0% 52.7µs ± 0% -0.83% (p=0.016 n=5+5) Phases/kv-read/Parse-16 2.49ns ± 0% 2.49ns ± 1% ~ (p=0.952 n=4+5) Phases/kv-read/OptBuild-16 6.48µs ± 0% 6.44µs ± 1% ~ (p=0.310 n=5+5) Phases/kv-read-no-prep/Parse-16 12.7µs ± 2% 12.7µs ± 1% ~ (p=0.508 n=5+5) Phases/kv-read-const/Parse-16 2.59ns ± 1% 2.59ns ± 1% ~ (p=0.857 n=5+5) Phases/tpcc-delivery/Explore-16 27.9µs ± 0% 27.9µs ± 2% ~ (p=0.310 n=5+5) Phases/tpcc-stock-level/Parse-16 2.57ns ± 2% 2.54ns ± 3% ~ (p=0.254 n=5+5) Phases/tpcc-stock-level/Normalize-16 53.8µs ± 0% 53.5µs ± 2% ~ (p=0.151 n=5+5) Phases/tpcc-stock-level/Explore-16 153µs ± 0% 152µs ± 3% ~ (p=0.151 n=5+5) Phases/tpcc-stock-level/ExecBuild-16 162µs ± 1% 161µs ± 1% ~ (p=0.095 n=5+5) Release note: None --- pkg/sql/opt/xform/optimizer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sql/opt/xform/optimizer.go b/pkg/sql/opt/xform/optimizer.go index 1f25de55acc7..1ce78755c03c 100644 --- a/pkg/sql/opt/xform/optimizer.go +++ b/pkg/sql/opt/xform/optimizer.go @@ -95,7 +95,7 @@ type Optimizer struct { disabledRules RuleSet // JoinOrderBuilder adds new join orderings to the memo. - jb *JoinOrderBuilder + jb JoinOrderBuilder } // Init initializes the Optimizer with a new, blank memo structure inside. This @@ -112,7 +112,7 @@ func (o *Optimizer) Init(evalCtx *tree.EvalContext, catalog cat.Catalog) { o.matchedRule = nil o.appliedRule = nil o.disabledRules = util.FastIntSet{} - o.jb = &JoinOrderBuilder{} + o.jb = JoinOrderBuilder{} if evalCtx.TestingKnobs.DisableOptimizerRuleProbability > 0 { o.disableRules(evalCtx.TestingKnobs.DisableOptimizerRuleProbability) } @@ -151,7 +151,7 @@ func (o *Optimizer) SetCoster(coster Coster) { // JoinOrderBuilder returns the JoinOrderBuilder instance that the optimizer is // currently using to reorder join trees. func (o *Optimizer) JoinOrderBuilder() *JoinOrderBuilder { - return o.jb + return &o.jb } // DisableOptimizations disables all transformation rules, including normalize From 78b5535ee45be87e07a40334763b9d77517ab7b3 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 8 Sep 2020 19:21:58 -0400 Subject: [PATCH 7/7] server: introduce Test{ClusterConnectivity,JoinVersionGate} TestClusterConnectivity sets up an uninitialized cluster with custom join flags (individual nodes point to specific others, constructed using connectivity matrices). Included in the test configurations are the more "standard" ones where every node points to n1, or where there are bidirectional links set up between nodes. It tests various topologies and ensures that cluster IDs are distributed throughout connected components, and store/node IDs are allocated in the way we'd expect them to be. TestJoinVersionGate checks to see that improperly versioned cockroach nodes are not able to join a cluster. Release note: None --- pkg/server/connectivity_test.go | 334 ++++++++++++++++++ pkg/testutils/serverutils/test_server_shim.go | 5 + 2 files changed, 339 insertions(+) create mode 100644 pkg/server/connectivity_test.go diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go new file mode 100644 index 000000000000..dc6fbe12fbf4 --- /dev/null +++ b/pkg/server/connectivity_test.go @@ -0,0 +1,334 @@ +// Copyright 2016 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server_test + +import ( + "context" + "fmt" + "net" + "sort" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "google.golang.org/grpc" +) + +// TestClusterConnectivity sets up an uninitialized cluster with custom join +// flags (individual nodes point to specific others, instead of all pointing to +// n1), and tests that the cluster/node IDs are distributed correctly +// throughout. +func TestClusterConnectivity(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(irfansharif): Teach TestServer to accept a list of join addresses + // instead of just one. + + var testConfigurations = []struct { + // bootstrapNode controls which node is `cockroach init`-ialized. + // Everything is 0-indexed. + bootstrapNode int + + // joinConfig[i] returns the node the i-th node is pointing to through + // its join flags. Everything is 0-indexed. + joinConfig []int + }{ + // 0. Every node points to the first, including the first. + {0, []int{0, 0, 0, 0, 0}}, + + // 1. Every node points to the previous, except the first, which points to + // itself. + // + // 0 <-- 1 <-- 2 <-- 3 <-- 4 + {0, []int{0, 0, 1, 2, 3}}, + + // 2. Same as previous, but a few links switched around. + // + // 0 <-- 2 <-- 1 <-- 3 <-- 4 + {0, []int{0, 2, 0, 1, 3}}, + + // 3. Introduce a bidirectional link. + // + // 1 <-> 2 <-- 0 <-- 3 + // 1 <-- 4 + {1, []int{2, 2, 1, 0, 1}}, + + // 4. Same as above, but bootstrap the other node in the bidirectional + // link. + // + // 1 <-> 2 <-- 0 <-- 3 + // 1 <-- 4 + {2, []int{2, 2, 1, 0, 1}}, + + // 5. Another topology centered around node 1, which itself is pointed + // to node 0. + // + // 0 <-> 1 <-- 2 + // 1 <-- 3 + {0, []int{1, 0, 1, 1}}, + + // 6. Same as above, but bootstrapping the centered node directly. + // + // 0 <-> 1 <-- 2 + // 1 <-- 3 + {1, []int{1, 0, 1, 1}}, + + // TODO(irfansharif): We would really like to be able to set up test + // clusters that are only partially connected, and assert that only + // nodes that are supposed to find out about bootstrap, actually do. + // Something like: + // + // 0 <-> 1 <-- 2 + // 5 <-- 4 <-- 3 <-- 5 + // + // A version of this was originally prototyped in #52526 but the changes + // required in Test{Cluster,Server} were too invasive to justify at the + // time. + } + + // getListener is a short hand to allocate a listener to an unbounded port. + getListener := func() net.Listener { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + return listener + } + baseServerArgs := base.TestServerArgs{ + // We're going to manually control initialization in this test. + NoAutoInitializeCluster: true, + StoreSpecs: []base.StoreSpec{{InMemory: true}}, + } + + for i, test := range testConfigurations { + t.Run(fmt.Sprintf("topology=%d", i), func(t *testing.T) { + numNodes := len(test.joinConfig) + var serverArgsPerNode = make(map[int]base.TestServerArgs) + + // We start off with installing a listener for each server. We + // pre-bind a listener so the kernel can go ahead and assign an + // address for us. We'll later use this address to populate join + // flags for neighboring nodes. + var listeners = make([]net.Listener, numNodes) + for i := 0; i < numNodes; i++ { + listener := getListener() + + serverArg := baseServerArgs + serverArg.Listener = listener + serverArg.Addr = listener.Addr().String() + serverArgsPerNode[i] = serverArg + listeners[i] = listener + } + + // We'll annotate the server args with the right join flags. + for i := 0; i < numNodes; i++ { + joinNode := test.joinConfig[i] + joinAddr := listeners[joinNode].Addr().String() + + serverArg := serverArgsPerNode[i] + serverArg.JoinAddr = joinAddr + serverArgsPerNode[i] = serverArg + } + + tcArgs := base.TestClusterArgs{ + // Saves time in this test. + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgsPerNode, + + // We have to start servers in parallel because we're looking to + // bootstrap the cluster manually in a separate thread. Each + // individual Server.Start is a blocking call (it waits for + // init). We want to start all of them in parallel to simulate a + // bunch of servers each waiting for init. + ParallelStart: true, + } + + // The test structure here is a bit convoluted, but necessary given + // the current implementation of TestCluster. TestCluster.Start + // wants to wait for all the nodes in the test cluster to be fully + // initialized before returning. Given we're testing initialization + // behavior, we do all the real work in a separate thread and keep + // the main thread limited to simply starting and stopping the test + // cluster. + // + // NB: That aside, TestCluster very much wants to live on the main + // goroutine running the test. That's mostly to do with its internal + // error handling and the limitations imposed by + // https://golang.org/pkg/testing/#T.FailNow (which sits underneath + // t.Fatal). + + tc := testcluster.NewTestCluster(t, numNodes, tcArgs) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + // Attempt to bootstrap the cluster through the configured node. + bootstrapNode := test.bootstrapNode + testutils.SucceedsSoon(t, func() (e error) { + ctx := context.Background() + serv := tc.Server(bootstrapNode) + + dialOpts, err := tc.Server(bootstrapNode).RPCContext().GRPCDialOptions() + if err != nil { + return err + } + + conn, err := grpc.DialContext(ctx, serv.ServingRPCAddr(), dialOpts...) + if err != nil { + return err + } + defer func() { + _ = conn.Close() + }() + + client := serverpb.NewInitClient(conn) + _, err = client.Bootstrap(context.Background(), &serverpb.BootstrapRequest{}) + return err + }) + + // Wait to get a real cluster ID (doesn't always get populated + // right after bootstrap). + testutils.SucceedsSoon(t, func() error { + clusterID := tc.Server(bootstrapNode).ClusterID() + if clusterID.Equal(uuid.UUID{}) { + return errors.New("cluster ID still not recorded") + } + return nil + }) + + clusterID := tc.Server(bootstrapNode).ClusterID() + testutils.SucceedsSoon(t, func() error { + var nodeIDs []roachpb.NodeID + var storeIDs []roachpb.StoreID + + // Sanity check that all the nodes we expect to join this + // network actually do (by checking they discover the right + // cluster ID). Also collect node/store IDs for below. + for i := 0; i < numNodes; i++ { + if got := tc.Server(i).ClusterID(); got != clusterID { + return errors.Newf("mismatched cluster IDs; %s (for node %d) != %s (for node %d)", + clusterID.String(), bootstrapNode, got.String(), i) + } + + nodeIDs = append(nodeIDs, tc.Server(i).NodeID()) + storeIDs = append(storeIDs, tc.Server(i).GetFirstStoreID()) + } + + sort.Slice(nodeIDs, func(i, j int) bool { + return nodeIDs[i] < nodeIDs[j] + }) + sort.Slice(storeIDs, func(i, j int) bool { + return storeIDs[i] < storeIDs[j] + }) + + // Double check that we have the full set of node/store IDs + // we expect. + for i := 1; i <= len(nodeIDs); i++ { + expNodeID := roachpb.NodeID(i) + if got := nodeIDs[i-1]; got != expNodeID { + return errors.Newf("unexpected node ID; expected %s, got %s", expNodeID.String(), got.String()) + } + + expStoreID := roachpb.StoreID(i) + if got := storeIDs[i-1]; got != expStoreID { + return errors.Newf("unexpected store ID; expected %s, got %s", expStoreID.String(), got.String()) + } + } + + return nil + }) + }() + + // Start the test cluster. This is a blocking call, and expects the + // configured number of servers in the cluster to be fully + // initialized before it returns. Given that the initialization + // happens in the other thread, we'll only get past it after having + // bootstrapped the test cluster in the thread above. + tc.Start(t) + defer tc.Stopper().Stop(context.Background()) + + wg.Wait() + }) + } +} + +// TestJoinVersionGate checks to see that improperly versioned cockroach nodes +// are not able to join a cluster. +func TestJoinVersionGate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + commonArg := base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + {InMemory: true}, + }, + } + + numNodes := 3 + tcArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Saves time in this test. + ServerArgs: commonArg, + ParallelStart: true, + } + + tc := testcluster.StartTestCluster(t, numNodes, tcArgs) + defer tc.Stopper().Stop(context.Background()) + + testutils.SucceedsSoon(t, func() error { + for i := 0; i < numNodes; i++ { + clusterID := tc.Server(0).ClusterID() + got := tc.Server(i).ClusterID() + + if got != clusterID { + return errors.Newf("mismatched cluster IDs; %s (for node %d) != %s (for node %d)", clusterID.String(), 0, got.String(), i) + } + } + return nil + }) + + var newVersion = clusterversion.TestingBinaryVersion + var oldVersion = prev(newVersion) + + knobs := base.TestingKnobs{ + Server: &server.TestingKnobs{ + BinaryVersionOverride: oldVersion, + }, + } + + oldVersionServerArgs := commonArg + oldVersionServerArgs.Knobs = knobs + oldVersionServerArgs.JoinAddr = tc.Servers[0].ServingRPCAddr() + + serv, err := tc.AddServer(oldVersionServerArgs) + if err != nil { + t.Fatal(err) + } + defer serv.Stop() + + if err := serv.Start(); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) { + t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error()) + } +} diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index fe5108e9bf35..48741147cfbf 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // TestServerInterface defines test server functionality that tests need; it is @@ -50,6 +51,10 @@ type TestServerInterface interface { // NodeID returns the ID of this node within its cluster. NodeID() roachpb.NodeID + // ClusterID returns the cluster ID as understood by this node in the + // cluster. + ClusterID() uuid.UUID + // ServingRPCAddr returns the server's advertised address. ServingRPCAddr() string