Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physicalplan: make a couple of tests work with test tenant #108825

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions pkg/sql/physicalplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ var (
// Any errors stop the current test.
func runTestFlow(
t *testing.T,
srv serverutils.TestServerInterface,
ts serverutils.ApplicationLayerInterface,
txn *kv.Txn,
procs ...execinfrapb.ProcessorSpec,
) (rowenc.EncDatumRows, error) {
distSQLSrv := srv.DistSQLServer().(*distsql.ServerImpl)
distSQLSrv := ts.DistSQLServer().(*distsql.ServerImpl)

leafInputState, err := txn.GetLeafTxnInputState(context.Background())
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func runTestFlow(
func checkDistAggregationInfo(
ctx context.Context,
t *testing.T,
srv serverutils.TestServerInterface,
ts serverutils.ApplicationLayerInterface,
tableDesc catalog.TableDescriptor,
colIndexes []int,
numRows int,
Expand All @@ -146,17 +146,17 @@ func checkDistAggregationInfo(
Spans: make([]roachpb.Span, 1),
}
if err := rowenc.InitIndexFetchSpec(
&tr.FetchSpec, srv.Codec(), tableDesc, tableDesc.GetPrimaryIndex(), columnIDs,
&tr.FetchSpec, ts.Codec(), tableDesc, tableDesc.GetPrimaryIndex(), columnIDs,
); err != nil {
t.Fatal(err)
}

var err error
tr.Spans[0].Key, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, srv.Codec(), startPK)
tr.Spans[0].Key, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, ts.Codec(), startPK)
if err != nil {
t.Fatal(err)
}
tr.Spans[0].EndKey, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, srv.Codec(), endPK)
tr.Spans[0].EndKey, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, ts.Codec(), endPK)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func checkDistAggregationInfo(
varIdxs[i] = i
}

txn := kv.NewTxn(ctx, srv.DB(), srv.NodeID())
txn := kv.NewTxn(ctx, ts.DB(), ts.DistSQLPlanningNodeID())

// First run a flow that aggregates all the rows without any local stages.
nonDistFinalOutputTypes := finalOutputTypes
Expand All @@ -236,7 +236,7 @@ func checkDistAggregationInfo(
}

rowsNonDist, nonDistErr := runTestFlow(
t, srv, txn,
t, ts, txn,
makeTableReader(1, numRows+1, 0),
execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
Expand Down Expand Up @@ -341,7 +341,7 @@ func checkDistAggregationInfo(
}

procs = append(procs, finalProc)
rowsDist, distErr := runTestFlow(t, srv, txn, procs...)
rowsDist, distErr := runTestFlow(t, ts, txn, procs...)

if distErr != nil || nonDistErr != nil {
pgCodeDistErr := pgerror.GetPGCode(distErr)
Expand Down Expand Up @@ -527,9 +527,7 @@ func TestSingleArgumentDistAggregateFunctions(t *testing.T) {
name := fmt.Sprintf("%s/%s/%d", fn, col.GetName(), numRows)
t.Run(name, func(t *testing.T) {
checkDistAggregationInfo(
// TODO(#76378): pass ts, not srv, here.
context.Background(), t, srv, desc, []int{col.Ordinal()},
numRows, fn, info,
context.Background(), t, ts, desc, []int{col.Ordinal()}, numRows, fn, info,
)
})
}
Expand All @@ -550,9 +548,7 @@ func TestTwoArgumentRegressionAggregateFunctions(t *testing.T) {
defer log.Scope(t).Close(t)
const numRows = 100

srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763),
})
srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(context.Background())
ts := srv.ApplicationLayer()

Expand Down Expand Up @@ -600,8 +596,7 @@ func TestTwoArgumentRegressionAggregateFunctions(t *testing.T) {
name := fmt.Sprintf("%s/%s-%s/%d", fn, cols[i].GetName(), cols[j].GetName(), numRows)
t.Run(name, func(t *testing.T) {
checkDistAggregationInfo(
// TODO(#76378): pass ts, not srv, here.
context.Background(), t, srv, desc, []int{i, j}, numRows,
context.Background(), t, ts, desc, []int{i, j}, numRows,
fn, info,
)
})
Expand Down
19 changes: 7 additions & 12 deletions pkg/sql/physicalplan/fake_span_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ import (
func TestFakeSpanResolver(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763),
},
})
ctx := context.Background()
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
ts := tc.ApplicationLayer(0)

sqlutils.CreateTable(
t, tc.ServerConn(0), "t",
t, ts.SQLConn(t, ""), "t",
"k INT PRIMARY KEY, v INT",
100,
func(row int) []tree.Datum {
Expand All @@ -55,15 +52,13 @@ func TestFakeSpanResolver(t *testing.T) {

resolver := physicalplanutils.FakeResolverForTestCluster(tc)

db := tc.Server(0).DB()

txn := kv.NewTxn(ctx, db, tc.Server(0).NodeID())
txn := kv.NewTxn(ctx, ts.DB(), ts.DistSQLPlanningNodeID())
it := resolver.NewSpanResolverIterator(txn, nil)

tableDesc := desctestutils.TestingGetPublicTableDescriptor(db, keys.SystemSQLCodec, "test", "t")
tableDesc := desctestutils.TestingGetPublicTableDescriptor(ts.DB(), ts.Codec(), "test", "t")
primIdxValDirs := catalogkeys.IndexKeyValDirs(tableDesc.GetPrimaryIndex())

span := tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)
span := tableDesc.PrimaryIndexSpan(ts.Codec())

// Make sure we see all the nodes. It will not always happen (due to
// randomness) but it should happen most of the time.
Expand Down
47 changes: 20 additions & 27 deletions pkg/sql/physicalplan/span_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,21 @@ import (
func TestSpanResolverUsesCaches(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763),
UseDatabase: "t",
},
})
defer tc.Stopper().Stop(context.Background())
defer tc.Stopper().Stop(ctx)

rowRanges, _ := setupRanges(
tc.ServerConn(0),
tc.Server(0).ApplicationLayer(),
tc.Server(0).StorageLayer(),
tc.ApplicationLayer(0).SQLConn(t, "t"),
tc.ApplicationLayer(0),
tc.StorageLayer(0),
t)

// Replicate the row ranges on all of the first 3 nodes. Save the 4th node in
Expand Down Expand Up @@ -85,7 +86,7 @@ func TestSpanResolverUsesCaches(t *testing.T) {
}

// Create a SpanResolver using the 4th node, with empty caches.
s3 := tc.Server(3).ApplicationLayer()
s3 := tc.ApplicationLayer(3)

lr := physicalplan.NewSpanResolver(
s3.ClusterSettings(),
Expand All @@ -112,16 +113,15 @@ func TestSpanResolverUsesCaches(t *testing.T) {

// Resolve the spans. Since the range descriptor cache doesn't have any
// leases, all the ranges should be grouped and "assigned" to replica 0.
replicas, err := resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...)
replicas, err := resolveSpans(ctx, lr.NewSpanResolverIterator(nil, nil), spans...)
if err != nil {
t.Fatal(err)
}
if len(replicas) != 3 {
t.Fatalf("expected replies for 3 spans, got %d", len(replicas))
}
si := tc.Servers[0]

storeID := si.GetFirstStoreID()
storeID := tc.Servers[0].GetFirstStoreID()
for i := 0; i < 3; i++ {
if len(replicas[i]) != 1 {
t.Fatalf("expected 1 range for span %s, got %d",
Expand All @@ -134,12 +134,19 @@ func TestSpanResolverUsesCaches(t *testing.T) {
}
}

// Now populate the cached on node 4 and query again. This time, we expect to see
// each span on its own range.
if err := populateCache(tc.Conns[3], 3 /* expectedNumRows */); err != nil {
// Now populate the cache on node 4 and query again. Note that this way of
// populating the range cache is the reason for why this test is disabled
// with the default test tenant (#108763).
numExpectedRows := len(rowRanges)
var numRows int
err = s3.SQLConn(t, "t").QueryRow(`SELECT count(1) FROM test`).Scan(&numRows)
if err != nil {
t.Fatal(err)
}
replicas, err = resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...)
if numRows != numExpectedRows {
t.Fatal(errors.Errorf("expected %d rows, got %d", numExpectedRows, numRows))
}
replicas, err = resolveSpans(ctx, lr.NewSpanResolverIterator(nil, nil), spans...)
if err != nil {
t.Fatal(err)
}
Expand All @@ -153,20 +160,6 @@ func TestSpanResolverUsesCaches(t *testing.T) {
}
}

// populateCache runs a scan over a whole table to populate the range cache and
// the lease holder cache of the server to which db is connected.
func populateCache(db *gosql.DB, expectedNumRows int) error {
var numRows int
err := db.QueryRow(`SELECT count(1) FROM test`).Scan(&numRows)
if err != nil {
return err
}
if numRows != expectedNumRows {
return errors.Errorf("expected %d rows, got %d", expectedNumRows, numRows)
}
return nil
}

// splitRangeAtVal splits the range for a table with schema
// `CREATE TABLE test (k INT PRIMARY KEY)` at row with value pk (the row will be
// the first on the right of the split).
Expand Down