Skip to content

Commit

Permalink
Merge #108825
Browse files Browse the repository at this point in the history
108825: physicalplan: make a couple of tests work with test tenant r=yuzefovich a=yuzefovich

Two out of three tests tracked by #108763 required only a minor adjustment. However, one of the tests exposed (perhaps previously unknown) difference between single-tenant and multi-tenant configs in terms of populating the range cache. The issue has been repurposed to tracking investigating and addressing that difference.

Addresses: #108763

Epic: None

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 16, 2023
2 parents 02cdbba + 517cea3 commit 75c6056
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 56 deletions.
29 changes: 12 additions & 17 deletions pkg/sql/physicalplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ var (
// Any errors stop the current test.
func runTestFlow(
t *testing.T,
srv serverutils.TestServerInterface,
ts serverutils.ApplicationLayerInterface,
txn *kv.Txn,
procs ...execinfrapb.ProcessorSpec,
) (rowenc.EncDatumRows, error) {
distSQLSrv := srv.DistSQLServer().(*distsql.ServerImpl)
distSQLSrv := ts.DistSQLServer().(*distsql.ServerImpl)

leafInputState, err := txn.GetLeafTxnInputState(context.Background())
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func runTestFlow(
func checkDistAggregationInfo(
ctx context.Context,
t *testing.T,
srv serverutils.TestServerInterface,
ts serverutils.ApplicationLayerInterface,
tableDesc catalog.TableDescriptor,
colIndexes []int,
numRows int,
Expand All @@ -146,17 +146,17 @@ func checkDistAggregationInfo(
Spans: make([]roachpb.Span, 1),
}
if err := rowenc.InitIndexFetchSpec(
&tr.FetchSpec, srv.Codec(), tableDesc, tableDesc.GetPrimaryIndex(), columnIDs,
&tr.FetchSpec, ts.Codec(), tableDesc, tableDesc.GetPrimaryIndex(), columnIDs,
); err != nil {
t.Fatal(err)
}

var err error
tr.Spans[0].Key, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, srv.Codec(), startPK)
tr.Spans[0].Key, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, ts.Codec(), startPK)
if err != nil {
t.Fatal(err)
}
tr.Spans[0].EndKey, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, srv.Codec(), endPK)
tr.Spans[0].EndKey, err = randgen.TestingMakePrimaryIndexKeyForTenant(tableDesc, ts.Codec(), endPK)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func checkDistAggregationInfo(
varIdxs[i] = i
}

txn := kv.NewTxn(ctx, srv.DB(), srv.NodeID())
txn := kv.NewTxn(ctx, ts.DB(), ts.DistSQLPlanningNodeID())

// First run a flow that aggregates all the rows without any local stages.
nonDistFinalOutputTypes := finalOutputTypes
Expand All @@ -236,7 +236,7 @@ func checkDistAggregationInfo(
}

rowsNonDist, nonDistErr := runTestFlow(
t, srv, txn,
t, ts, txn,
makeTableReader(1, numRows+1, 0),
execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
Expand Down Expand Up @@ -341,7 +341,7 @@ func checkDistAggregationInfo(
}

procs = append(procs, finalProc)
rowsDist, distErr := runTestFlow(t, srv, txn, procs...)
rowsDist, distErr := runTestFlow(t, ts, txn, procs...)

if distErr != nil || nonDistErr != nil {
pgCodeDistErr := pgerror.GetPGCode(distErr)
Expand Down Expand Up @@ -527,9 +527,7 @@ func TestSingleArgumentDistAggregateFunctions(t *testing.T) {
name := fmt.Sprintf("%s/%s/%d", fn, col.GetName(), numRows)
t.Run(name, func(t *testing.T) {
checkDistAggregationInfo(
// TODO(#76378): pass ts, not srv, here.
context.Background(), t, srv, desc, []int{col.Ordinal()},
numRows, fn, info,
context.Background(), t, ts, desc, []int{col.Ordinal()}, numRows, fn, info,
)
})
}
Expand All @@ -550,9 +548,7 @@ func TestTwoArgumentRegressionAggregateFunctions(t *testing.T) {
defer log.Scope(t).Close(t)
const numRows = 100

srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763),
})
srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(context.Background())
ts := srv.ApplicationLayer()

Expand Down Expand Up @@ -600,8 +596,7 @@ func TestTwoArgumentRegressionAggregateFunctions(t *testing.T) {
name := fmt.Sprintf("%s/%s-%s/%d", fn, cols[i].GetName(), cols[j].GetName(), numRows)
t.Run(name, func(t *testing.T) {
checkDistAggregationInfo(
// TODO(#76378): pass ts, not srv, here.
context.Background(), t, srv, desc, []int{i, j}, numRows,
context.Background(), t, ts, desc, []int{i, j}, numRows,
fn, info,
)
})
Expand Down
19 changes: 7 additions & 12 deletions pkg/sql/physicalplan/fake_span_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ import (
func TestFakeSpanResolver(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763),
},
})
ctx := context.Background()
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
ts := tc.ApplicationLayer(0)

sqlutils.CreateTable(
t, tc.ServerConn(0), "t",
t, ts.SQLConn(t, ""), "t",
"k INT PRIMARY KEY, v INT",
100,
func(row int) []tree.Datum {
Expand All @@ -55,15 +52,13 @@ func TestFakeSpanResolver(t *testing.T) {

resolver := physicalplanutils.FakeResolverForTestCluster(tc)

db := tc.Server(0).DB()

txn := kv.NewTxn(ctx, db, tc.Server(0).NodeID())
txn := kv.NewTxn(ctx, ts.DB(), ts.DistSQLPlanningNodeID())
it := resolver.NewSpanResolverIterator(txn, nil)

tableDesc := desctestutils.TestingGetPublicTableDescriptor(db, keys.SystemSQLCodec, "test", "t")
tableDesc := desctestutils.TestingGetPublicTableDescriptor(ts.DB(), ts.Codec(), "test", "t")
primIdxValDirs := catalogkeys.IndexKeyValDirs(tableDesc.GetPrimaryIndex())

span := tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)
span := tableDesc.PrimaryIndexSpan(ts.Codec())

// Make sure we see all the nodes. It will not always happen (due to
// randomness) but it should happen most of the time.
Expand Down
47 changes: 20 additions & 27 deletions pkg/sql/physicalplan/span_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,21 @@ import (
func TestSpanResolverUsesCaches(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(108763),
UseDatabase: "t",
},
})
defer tc.Stopper().Stop(context.Background())
defer tc.Stopper().Stop(ctx)

rowRanges, _ := setupRanges(
tc.ServerConn(0),
tc.Server(0).ApplicationLayer(),
tc.Server(0).StorageLayer(),
tc.ApplicationLayer(0).SQLConn(t, "t"),
tc.ApplicationLayer(0),
tc.StorageLayer(0),
t)

// Replicate the row ranges on all of the first 3 nodes. Save the 4th node in
Expand Down Expand Up @@ -85,7 +86,7 @@ func TestSpanResolverUsesCaches(t *testing.T) {
}

// Create a SpanResolver using the 4th node, with empty caches.
s3 := tc.Server(3).ApplicationLayer()
s3 := tc.ApplicationLayer(3)

lr := physicalplan.NewSpanResolver(
s3.ClusterSettings(),
Expand All @@ -112,16 +113,15 @@ func TestSpanResolverUsesCaches(t *testing.T) {

// Resolve the spans. Since the range descriptor cache doesn't have any
// leases, all the ranges should be grouped and "assigned" to replica 0.
replicas, err := resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...)
replicas, err := resolveSpans(ctx, lr.NewSpanResolverIterator(nil, nil), spans...)
if err != nil {
t.Fatal(err)
}
if len(replicas) != 3 {
t.Fatalf("expected replies for 3 spans, got %d", len(replicas))
}
si := tc.Servers[0]

storeID := si.GetFirstStoreID()
storeID := tc.Servers[0].GetFirstStoreID()
for i := 0; i < 3; i++ {
if len(replicas[i]) != 1 {
t.Fatalf("expected 1 range for span %s, got %d",
Expand All @@ -134,12 +134,19 @@ func TestSpanResolverUsesCaches(t *testing.T) {
}
}

// Now populate the cached on node 4 and query again. This time, we expect to see
// each span on its own range.
if err := populateCache(tc.Conns[3], 3 /* expectedNumRows */); err != nil {
// Now populate the cache on node 4 and query again. Note that this way of
// populating the range cache is the reason for why this test is disabled
// with the default test tenant (#108763).
numExpectedRows := len(rowRanges)
var numRows int
err = s3.SQLConn(t, "t").QueryRow(`SELECT count(1) FROM test`).Scan(&numRows)
if err != nil {
t.Fatal(err)
}
replicas, err = resolveSpans(context.Background(), lr.NewSpanResolverIterator(nil, nil), spans...)
if numRows != numExpectedRows {
t.Fatal(errors.Errorf("expected %d rows, got %d", numExpectedRows, numRows))
}
replicas, err = resolveSpans(ctx, lr.NewSpanResolverIterator(nil, nil), spans...)
if err != nil {
t.Fatal(err)
}
Expand All @@ -153,20 +160,6 @@ func TestSpanResolverUsesCaches(t *testing.T) {
}
}

// populateCache runs a scan over a whole table to populate the range cache and
// the lease holder cache of the server to which db is connected.
func populateCache(db *gosql.DB, expectedNumRows int) error {
var numRows int
err := db.QueryRow(`SELECT count(1) FROM test`).Scan(&numRows)
if err != nil {
return err
}
if numRows != expectedNumRows {
return errors.Errorf("expected %d rows, got %d", expectedNumRows, numRows)
}
return nil
}

// splitRangeAtVal splits the range for a table with schema
// `CREATE TABLE test (k INT PRIMARY KEY)` at row with value pk (the row will be
// the first on the right of the split).
Expand Down

0 comments on commit 75c6056

Please sign in to comment.