diff --git a/pkg/sql/physicalplan/aggregator_funcs_test.go b/pkg/sql/physicalplan/aggregator_funcs_test.go index e8db1e11c658..d32e89d45b24 100644 --- a/pkg/sql/physicalplan/aggregator_funcs_test.go +++ b/pkg/sql/physicalplan/aggregator_funcs_test.go @@ -61,11 +61,11 @@ var ( // Any errors stop the current test. func runTestFlow( t *testing.T, - srv serverutils.TestServerInterface, + ts serverutils.ApplicationLayerInterface, txn *kv.Txn, procs ...execinfrapb.ProcessorSpec, ) (rowenc.EncDatumRows, error) { - distSQLSrv := srv.DistSQLServer().(*distsql.ServerImpl) + distSQLSrv := ts.DistSQLServer().(*distsql.ServerImpl) leafInputState, err := txn.GetLeafTxnInputState(context.Background()) if err != nil { @@ -124,7 +124,7 @@ func runTestFlow( func checkDistAggregationInfo( ctx context.Context, t *testing.T, - srv serverutils.TestServerInterface, + ts serverutils.ApplicationLayerInterface, tableDesc catalog.TableDescriptor, colIndexes []int, numRows int, @@ -146,17 +146,17 @@ func checkDistAggregationInfo( Spans: make([]roachpb.Span, 1), } if err := rowenc.InitIndexFetchSpec( - &tr.FetchSpec, srv.Codec(), tableDesc, tableDesc.GetPrimaryIndex(), columnIDs, + &tr.FetchSpec, ts.Codec(), tableDesc, tableDesc.GetPrimaryIndex(), columnIDs, ); err != nil { t.Fatal(err) } var err error - tr.Spans[0].Key, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, srv.Codec(), startPK) + tr.Spans[0].Key, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, ts.Codec(), startPK) if err != nil { t.Fatal(err) } - tr.Spans[0].EndKey, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, srv.Codec(), endPK) + tr.Spans[0].EndKey, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, ts.Codec(), endPK) if err != nil { t.Fatal(err) } @@ -217,7 +217,7 @@ func checkDistAggregationInfo( varIdxs[i] = i } - txn := kv.NewTxn(ctx, srv.DB(), srv.NodeID()) + txn := kv.NewTxn(ctx, ts.DB(), ts.DistSQLPlanningNodeID()) // First run a flow that aggregates all the rows without any local stages. nonDistFinalOutputTypes := finalOutputTypes @@ -236,7 +236,7 @@ func checkDistAggregationInfo( } rowsNonDist, nonDistErr := runTestFlow( - t, srv, txn, + t, ts, txn, makeTableReader(1, numRows+1, 0), execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ @@ -341,7 +341,7 @@ func checkDistAggregationInfo( } procs = append(procs, finalProc) - rowsDist, distErr := runTestFlow(t, srv, txn, procs...) + rowsDist, distErr := runTestFlow(t, ts, txn, procs...) if distErr != nil || nonDistErr != nil { pgCodeDistErr := pgerror.GetPGCode(distErr) @@ -527,9 +527,7 @@ func TestSingleArgumentDistAggregateFunctions(t *testing.T) { name := fmt.Sprintf("%s/%s/%d", fn, col.GetName(), numRows) t.Run(name, func(t *testing.T) { checkDistAggregationInfo( - // TODO(#76378): pass ts, not srv, here. - context.Background(), t, srv, desc, []int{col.Ordinal()}, - numRows, fn, info, + context.Background(), t, ts, desc, []int{col.Ordinal()}, numRows, fn, info, ) }) } @@ -550,9 +548,7 @@ func TestTwoArgumentRegressionAggregateFunctions(t *testing.T) { defer log.Scope(t).Close(t) const numRows = 100 - srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{ - DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763), - }) + srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer srv.Stopper().Stop(context.Background()) ts := srv.ApplicationLayer() @@ -600,8 +596,7 @@ func TestTwoArgumentRegressionAggregateFunctions(t *testing.T) { name := fmt.Sprintf("%s/%s-%s/%d", fn, cols[i].GetName(), cols[j].GetName(), numRows) t.Run(name, func(t *testing.T) { checkDistAggregationInfo( - // TODO(#76378): pass ts, not srv, here. - context.Background(), t, srv, desc, []int{i, j}, numRows, + context.Background(), t, ts, desc, []int{i, j}, numRows, fn, info, ) }) diff --git a/pkg/sql/physicalplan/fake_span_resolver_test.go b/pkg/sql/physicalplan/fake_span_resolver_test.go index 66542c18a3f3..f5dfa6cfbbfa 100644 --- a/pkg/sql/physicalplan/fake_span_resolver_test.go +++ b/pkg/sql/physicalplan/fake_span_resolver_test.go @@ -32,17 +32,14 @@ import ( func TestFakeSpanResolver(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763), - }, - }) + ctx := context.Background() + tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) + ts := tc.ApplicationLayer(0) sqlutils.CreateTable( - t, tc.ServerConn(0), "t", + t, ts.SQLConn(t, ""), "t", "k INT PRIMARY KEY, v INT", 100, func(row int) []tree.Datum { @@ -55,15 +52,13 @@ func TestFakeSpanResolver(t *testing.T) { resolver := physicalplanutils.FakeResolverForTestCluster(tc) - db := tc.Server(0).DB() - - txn := kv.NewTxn(ctx, db, tc.Server(0).NodeID()) + txn := kv.NewTxn(ctx, ts.DB(), ts.DistSQLPlanningNodeID()) it := resolver.NewSpanResolverIterator(txn, nil) - tableDesc := desctestutils.TestingGetPublicTableDescriptor(db, keys.SystemSQLCodec, "test", "t") + tableDesc := desctestutils.TestingGetPublicTableDescriptor(ts.DB(), ts.Codec(), "test", "t") primIdxValDirs := catalogkeys.IndexKeyValDirs(tableDesc.GetPrimaryIndex()) - span := tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + span := tableDesc.PrimaryIndexSpan(ts.Codec()) // Make sure we see all the nodes. It will not always happen (due to // randomness) but it should happen most of the time. diff --git a/pkg/sql/physicalplan/span_resolver_test.go b/pkg/sql/physicalplan/span_resolver_test.go index cc1987af54d1..30100da9c706 100644 --- a/pkg/sql/physicalplan/span_resolver_test.go +++ b/pkg/sql/physicalplan/span_resolver_test.go @@ -39,20 +39,21 @@ import ( func TestSpanResolverUsesCaches(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + + ctx := context.Background() tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763), - UseDatabase: "t", }, }) - defer tc.Stopper().Stop(context.Background()) + defer tc.Stopper().Stop(ctx) rowRanges, _ := setupRanges( - tc.ServerConn(0), - tc.Server(0).ApplicationLayer(), - tc.Server(0).StorageLayer(), + tc.ApplicationLayer(0).SQLConn(t, "t"), + tc.ApplicationLayer(0), + tc.StorageLayer(0), t) // Replicate the row ranges on all of the first 3 nodes. Save the 4th node in @@ -85,7 +86,7 @@ func TestSpanResolverUsesCaches(t *testing.T) { } // Create a SpanResolver using the 4th node, with empty caches. - s3 := tc.Server(3).ApplicationLayer() + s3 := tc.ApplicationLayer(3) lr := physicalplan.NewSpanResolver( s3.ClusterSettings(), @@ -112,16 +113,15 @@ func TestSpanResolverUsesCaches(t *testing.T) { // Resolve the spans. Since the range descriptor cache doesn't have any // leases, all the ranges should be grouped and "assigned" to replica 0. - replicas, err := resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...) + replicas, err := resolveSpans(ctx, lr.NewSpanResolverIterator(nil, nil), spans...) if err != nil { t.Fatal(err) } if len(replicas) != 3 { t.Fatalf("expected replies for 3 spans, got %d", len(replicas)) } - si := tc.Servers[0] - storeID := si.GetFirstStoreID() + storeID := tc.Servers[0].GetFirstStoreID() for i := 0; i < 3; i++ { if len(replicas[i]) != 1 { t.Fatalf("expected 1 range for span %s, got %d", @@ -134,12 +134,19 @@ func TestSpanResolverUsesCaches(t *testing.T) { } } - // Now populate the cached on node 4 and query again. This time, we expect to see - // each span on its own range. - if err := populateCache(tc.Conns[3], 3 /* expectedNumRows */); err != nil { + // Now populate the cache on node 4 and query again. Note that this way of + // populating the range cache is the reason for why this test is disabled + // with the default test tenant (#108763). + numExpectedRows := len(rowRanges) + var numRows int + err = s3.SQLConn(t, "t").QueryRow(`SELECT count(1) FROM test`).Scan(&numRows) + if err != nil { t.Fatal(err) } - replicas, err = resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...) + if numRows != numExpectedRows { + t.Fatal(errors.Errorf("expected %d rows, got %d", numExpectedRows, numRows)) + } + replicas, err = resolveSpans(ctx, lr.NewSpanResolverIterator(nil, nil), spans...) if err != nil { t.Fatal(err) } @@ -153,20 +160,6 @@ func TestSpanResolverUsesCaches(t *testing.T) { } } -// populateCache runs a scan over a whole table to populate the range cache and -// the lease holder cache of the server to which db is connected. -func populateCache(db *gosql.DB, expectedNumRows int) error { - var numRows int - err := db.QueryRow(`SELECT count(1) FROM test`).Scan(&numRows) - if err != nil { - return err - } - if numRows != expectedNumRows { - return errors.Errorf("expected %d rows, got %d", expectedNumRows, numRows) - } - return nil -} - // splitRangeAtVal splits the range for a table with schema // `CREATE TABLE test (k INT PRIMARY KEY)` at row with value pk (the row will be // the first on the right of the split).