Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: adjust a couple of memory monitoring tests #100550

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ go_test(
"//pkg/build/bazel",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/clusterversion",
"//pkg/col/coldata",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/gossip",
Expand Down
186 changes: 103 additions & 83 deletions pkg/sql/builtin_mem_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,112 +12,132 @@ package sql

import (
"context"
gosql "database/sql"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
)

// rowSize is the length of the string present in each row of the table created
// by createTableWithLongStrings.
const rowSize = 50000

// numRows is the number of rows to insert in createTableWithLongStrings.
// numRows and rowSize were picked arbitrarily but so that rowSize * numRows >
// lowMemoryBudget, so that aggregating them all in a CONCAT_AGG or
// ARRAY_AGG will exhaust lowMemoryBudget.
const numRows = 100
// TestAggregatesMonitorMemory verifies that the aggregates report their memory
// usage to the memory accounting system. This test works by blocking the query
// with the aggregate when it is in the "draining metadata" state in one
// goroutine and observing the memory monitoring system via
// crdb_internal.node_memory_monitors virtual table in another.
func TestAggregatesMonitorMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// lowMemoryBudget is the memory budget used to test builtins are recording
// their memory use. The budget needs to be large enough to establish the
// initial database connection, but small enough to overflow easily. It's set
// to be comfortably large enough that the server can start up with a bit of
// extra space to overflow.
const lowMemoryBudget = rowSize*numRows - 1
statements := []string{
// By avoiding printing the aggregate results we prevent anything
// besides the aggregate itself from using a lot of memory.
`SELECT length(concat_agg(a)) FROM d.t`,
`SELECT array_length(array_agg(a), 1) FROM d.t`,
`SELECT json_typeof(json_agg(a)) FROM d.t`,
}

// createTableWithLongStrings creates a table with a modest number of long strings,
// with the intention of using them to exhaust a memory budget.
func createTableWithLongStrings(sqlDB *gosql.DB) error {
// blockMainCh is used to block the main goroutine until the worker
// goroutine is trapped by the callback.
blockMainCh := make(chan struct{})
// blockWorkerCh is used to block the worker goroutine until the main
// goroutine checks the memory monitoring state.
blockWorkerCh := make(chan struct{})
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &ExecutorTestingKnobs{
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
var block bool
for _, testQuery := range statements {
block = block || query == testQuery
}
if !block {
return nil
}
var seenMeta bool
return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) {
if meta != nil && !seenMeta {
// If this is the first metadata object, then we
// know that the test query is almost done
// executing, so unblock the main goroutine and then
// wait for that goroutine to signal us to proceed.
blockMainCh <- struct{}{}
<-blockWorkerCh
seenMeta = true
}
}
},
},
},
})
defer s.Stopper().Stop(context.Background())

// Create a table with a modest number of long strings.
if _, err := sqlDB.Exec(`
CREATE DATABASE d;
CREATE TABLE d.t (a STRING)
`); err != nil {
return err
t.Fatal(err)
}

const numRows, rowSize = 100, 50000
for i := 0; i < numRows; i++ {
if _, err := sqlDB.Exec(`INSERT INTO d.t VALUES (repeat('a', $1))`, rowSize); err != nil {
return err
t.Fatal(err)
}
}
return nil
}

// TestConcatAggMonitorsMemory verifies that the aggregates incrementally
// record their memory usage as they build up their result.
func TestAggregatesMonitorMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// By avoiding printing the aggregate results we prevent anything
// besides the aggregate itself from being able to catch the
// large memory usage.
statements := []string{
`SELECT length(concat_agg(a)) FROM d.t`,
`SELECT array_length(array_agg(a), 1) FROM d.t`,
`SELECT json_typeof(json_agg(A)) FROM d.t`,
}
const expectedMemUsage = numRows * rowSize

for _, statement := range statements {
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
SQLMemoryPoolSize: lowMemoryBudget,
})

defer s.Stopper().Stop(context.Background())

if err := createTableWithLongStrings(sqlDB); err != nil {
errCh := make(chan error)
go func(statement string) {
dbConn := serverutils.OpenDBConn(
t, s.ServingSQLAddr(), "" /* useDatabase */, false /* insecure */, s.Stopper(),
)
defer dbConn.Close()
_, err := dbConn.Exec(statement)
errCh <- err
}(statement)
// Block this goroutine until the worker is at the end of its query
// execution.
<-blockMainCh
// Now verify that we have at least one memory monitor that uses more
// than the expected memory usage.
rows, err := sqlDB.Query("SELECT name, used FROM crdb_internal.node_memory_monitors")
if err != nil {
t.Fatal(err)
}

_, err := sqlDB.Exec(statement)

if pqErr := (*pq.Error)(nil); !errors.As(err, &pqErr) || pgcode.MakeCode(string(pqErr.Code)) != pgcode.OutOfMemory {
t.Fatalf("Expected \"%s\" to consume too much memory", statement)
}
}
}

func TestEvaluatedMemoryIsChecked(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We select the LENGTH here and elsewhere because if we passed the result of
// REPEAT up as a result, the memory error would be caught there even if
// REPEAT was not doing its accounting.
testData := []string{
`SELECT length(repeat('abc', 70000000))`,
`SELECT crdb_internal.no_constant_folding(length(repeat('abc', 70000000)))`,
}

for _, statement := range testData {
t.Run("", func(t *testing.T) {
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
SQLMemoryPoolSize: lowMemoryBudget,
})
defer s.Stopper().Stop(context.Background())

_, err := sqlDB.Exec(
statement,
)
if pqErr := (*pq.Error)(nil); !errors.As(err, &pqErr) || pgcode.MakeCode(string(pqErr.Code)) != pgcode.ProgramLimitExceeded {
t.Errorf(`expected %q to encounter "requested length too large" error, but it didn't`, statement)
var found bool
for rows.Next() {
var name string
var used int64
if err = rows.Scan(&name, &used); err != nil {
t.Fatal(err)
}
})
log.Infof(context.Background(), "%s: %d", name, used)
// We are likely to not have a separate monitor for the aggregator,
// so instead we look at the flow monitor for the query. "Our" flow
// monitor could be uniquely identified by the FlowID, but we can't
// easily get that information here, so we just assume that if we
// find the monitor for some flow, and it has large enough memory
// usage, then this is "ours" (this assumption sounds reasonable
// since we don't expect internal queries to use this much memory).
if strings.HasPrefix(name, "flow") && used >= expectedMemUsage {
found = true
}
}
blockWorkerCh <- struct{}{}
if err = <-errCh; err != nil {
t.Fatal(err)
}
if err = rows.Err(); err != nil {
t.Fatal(err)
}
if !found {
t.Fatalf("didn't find a memory monitor with at least %d bytes used", expectedMemUsage)
}
}
}
5 changes: 3 additions & 2 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -1567,12 +1568,12 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
if !addCallback.Load() || strings.HasPrefix(query, sql.SystemJobsAndJobInfoBaseQuery) {
return nil
}
numCallbacksAdded.Add(1)
return func(row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) {
return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) {
if meta != nil {
*meta = execinfrapb.ProducerMetadata{}
meta.Err = err
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,8 @@ type DistSQLReceiver struct {

testingKnobs struct {
// pushCallback, if set, will be called every time DistSQLReceiver.Push
// is called, with the same arguments.
pushCallback func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata)
// or DistSQLReceiver.PushBatch is called, with the same arguments.
pushCallback func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata)
}
}

Expand Down Expand Up @@ -1413,7 +1413,7 @@ func (r *DistSQLReceiver) Push(
) execinfra.ConsumerStatus {
r.checkConcurrentError()
if r.testingKnobs.pushCallback != nil {
r.testingKnobs.pushCallback(row, meta)
r.testingKnobs.pushCallback(row, nil /* batch */, meta)
}
if meta != nil {
return r.pushMeta(meta)
Expand Down Expand Up @@ -1492,6 +1492,9 @@ func (r *DistSQLReceiver) PushBatch(
batch coldata.Batch, meta *execinfrapb.ProducerMetadata,
) execinfra.ConsumerStatus {
r.checkConcurrentError()
if r.testingKnobs.pushCallback != nil {
r.testingKnobs.pushCallback(nil /* row */, batch, meta)
}
if meta != nil {
return r.pushMeta(meta)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -598,11 +599,11 @@ func TestDistSQLReceiverDrainsMeta(t *testing.T) {
UseDatabase: "test",
Knobs: base.TestingKnobs{
SQLExecutor: &ExecutorTestingKnobs{
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
if query != testQuery {
return nil
}
return func(row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) {
return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) {
if meta != nil {
accumulatedMeta = append(accumulatedMeta, *meta)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,8 +1590,8 @@ type ExecutorTestingKnobs struct {
// DistSQLReceiverPushCallbackFactory, if set, will be called every time a
// DistSQLReceiver is created for a new query execution, and it should
// return, possibly nil, a callback that will be called every time
// DistSQLReceiver.Push is called.
DistSQLReceiverPushCallbackFactory func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata)
// DistSQLReceiver.Push or DistSQLReceiver.PushBatch is called.
DistSQLReceiverPushCallbackFactory func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata)

// OnTxnRetry, if set, will be called if there is a transaction retry.
OnTxnRetry func(autoRetryReason error, evalCtx *eval.Context)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ go_test(
"//pkg/cloud/impl:cloudimpl",
"//pkg/cloud/nodelocal",
"//pkg/cloud/userfile",
"//pkg/col/coldata",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/importer/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -623,9 +624,9 @@ func TestProcessorEncountersUncertaintyError(t *testing.T) {
0: {
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
if strings.Contains(query, "EXPORT") {
return func(_ rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) {
return func(_ rowenc.EncDatumRow, _ coldata.Batch, meta *execinfrapb.ProducerMetadata) {
if meta != nil && meta.Err != nil {
if testutils.IsError(meta.Err, "ReadWithinUncertaintyIntervalError") {
close(gotRWUIOnGateway)
Expand Down