Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvstreamer, row: adjust all tests to work with the test tenant #108886

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ go_test(
"main_test.go",
"requests_provider_test.go",
"results_buffer_test.go",
"streamer_accounting_test.go",
"streamer_disabled_test.go",
"streamer_test.go",
],
args = ["-test.timeout=295s"],
embed = [":kvstreamer"],
deps = [
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ type avgResponseEstimator struct {
}

const (
// InitialAvgResponseSize is the initial estimate of the size of a single
// response.
// TODO(yuzefovich): use the optimizer-driven estimates.
initialAvgResponseSize = 1 << 10 // 1KiB
InitialAvgResponseSize = 1 << 10 // 1KiB
// This value was determined using tpchvec/bench test on all TPC-H queries.
defaultAvgResponseSizeMultiple = 1.5
)
Expand Down Expand Up @@ -64,7 +66,7 @@ func (e *avgResponseEstimator) init(sv *settings.Values) {
// Scans.
func (e *avgResponseEstimator) getAvgResponseSize() int64 {
if e.numRequestsStarted == 0 {
return initialAvgResponseSize
return InitialAvgResponseSize
}
// We're estimating the response size as the average over the received
// responses. Importantly, we divide the total responses' footprint by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAvgResponseEstimator(t *testing.T) {
e := avgResponseEstimator{avgResponseSizeMultiple: defaultAvgResponseSizeMultiple}

// Before receiving any responses, we should be using the initial estimate.
require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize())
require.Equal(t, int64(InitialAvgResponseSize), e.getAvgResponseSize())

// Simulate receiving a single response.
firstResponseSize := int64(42)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/large_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestLargeKeys(t *testing.T) {
})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
sql.SecondaryTenantSplitAtEnabled.Override(ctx, &s.ApplicationLayer().ClusterSettings().SV, true)

// We will lower the distsql_workmem limit so that we can operate with
// smaller blobs.
Expand Down
176 changes: 176 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer

import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/stretchr/testify/require"
)

// TestStreamerMemoryAccounting performs sanity checking on the memory
// accounting done by the streamer.
func TestStreamerMemoryAccounting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
codec := s.ApplicationLayer().Codec()

// Create a table (for which we know the encoding of valid keys) with a
// single row.
_, err := db.Exec("CREATE TABLE t (pk PRIMARY KEY, k) AS VALUES (0, 0)")
require.NoError(t, err)

// Obtain the TableID.
r := db.QueryRow("SELECT 't'::regclass::oid")
var tableID int
require.NoError(t, r.Scan(&tableID))

makeGetRequest := func(key int) kvpb.RequestUnion {
var res kvpb.RequestUnion
var get kvpb.GetRequest
var union kvpb.RequestUnion_Get
makeKey := func(pk int) []byte {
// These numbers essentially make a key like '/t/primary/key/0'.
return append(codec.IndexPrefix(uint32(tableID), 1), []byte{byte(136 + pk), 136}...)
}
get.Key = makeKey(key)
union.Get = &get
res.Value = &union
return res
}

monitor := mon.NewMonitor(
"streamer", /* name */
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment */
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
monitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64))
defer monitor.Stop(ctx)
acc := monitor.MakeBoundAccount()
defer acc.Close(ctx)

getStreamer := func(singleRowLookup bool) *Streamer {
require.Zero(t, acc.Used())
rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
if err != nil {
panic(err)
}
s := NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
s.Stopper(),
kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState),
cluster.MakeTestingClusterSettings(),
lock.WaitPolicy(0),
math.MaxInt64,
&acc,
nil, /* kvPairsRead */
nil, /* batchRequestsIssued */
lock.None,
)
s.Init(OutOfOrder, Hints{UniqueRequests: true, SingleRowLookup: singleRowLookup}, 1 /* maxKeysPerRow */, nil /* diskBuffer */)
return s
}

t.Run("get", func(t *testing.T) {
acc.Clear(ctx)
// SingleRowLookup hint only influences the accounting when at least
// one Scan request is present.
streamer := getStreamer(false /* singleRowLookup */)
defer streamer.Close(ctx)

// Get the row with pk=0.
reqs := make([]kvpb.RequestUnion, 1)
reqs[0] = makeGetRequest(0)
require.NoError(t, streamer.Enqueue(ctx, reqs))
results, err := streamer.GetResults(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(results))
// 7 is the number of bytes in GetResponse.Value.RawBytes.
var expectedMemToken = getResponseOverhead + 7
require.Equal(t, expectedMemToken, results[0].memoryTok.toRelease)
var expectedUsed = expectedMemToken + resultSize
require.Equal(t, expectedUsed, acc.Used())
})

for _, singleRowLookup := range []bool{false, true} {
t.Run(fmt.Sprintf("scan/single_row_lookup=%t", singleRowLookup), func(t *testing.T) {
acc.Clear(ctx)
streamer := getStreamer(singleRowLookup)
defer streamer.Close(ctx)

// Scan the row with pk=0.
reqs := make([]kvpb.RequestUnion, 1)
reqs[0] = makeScanRequest(codec, uint32(tableID), 0, 1)
require.NoError(t, streamer.Enqueue(ctx, reqs))
results, err := streamer.GetResults(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(results))
// 29 is usually the number of bytes in
// ScanResponse.BatchResponse[0]. We choose to hard-code this number
// rather than consult NumBytes field directly as an additional
// sanity-check. We also adjust the estimate to account for possible
// tenant prefix.
expectedMemToken := scanResponseOverhead + 29 + int64(len(codec.TenantPrefix()))
if results[0].ScanResp.NumBytes == 33+int64(len(codec.TenantPrefix())) {
// For some reason, sometimes it's not 29, but 33, and we do
// allow for this.
expectedMemToken += 4
}
require.Equal(t, expectedMemToken, results[0].memoryTok.toRelease)
expectedUsed := expectedMemToken + resultSize
if !singleRowLookup {
// This is streamer.numRangesPerScanRequestAccountedFor which is
// only non-zero when SingleRowLookup hint is false.
expectedUsed += 4
}
require.Equal(t, expectedUsed, acc.Used())
})
}
}

func makeScanRequest(codec keys.SQLCodec, tableID uint32, start, end int) kvpb.RequestUnion {
var res kvpb.RequestUnion
var scan kvpb.ScanRequest
var union kvpb.RequestUnion_Scan
makeKey := func(pk int) []byte {
// These numbers essentially make a key like '/t/primary/pk'.
return append(codec.IndexPrefix(tableID, 1), byte(136+pk))
}
scan.Key = makeKey(start)
scan.EndKey = makeKey(end)
scan.ScanFormat = kvpb.BATCH_RESPONSE
union.Scan = &scan
res.Value = &union
return res
}
Loading