diff --git a/build/bazelutil/bazel-generate.sh b/build/bazelutil/bazel-generate.sh index 24f572bae8a6..a5fede3df6a0 100755 --- a/build/bazelutil/bazel-generate.sh +++ b/build/bazelutil/bazel-generate.sh @@ -53,7 +53,7 @@ fi bazel run //:gazelle -if files_unchanged_from_upstream $(find -name BUILD.bazel) $(find -name '*.bzl'); then +if files_unchanged_from_upstream $(find ./pkg -name BUILD.bazel) $(find ./pkg -name '*.bzl'); then echo "Skipping //pkg/cmd/generate-test-suites (relevant files are unchanged from upstream)." else CONTENTS=$(bazel run //pkg/cmd/generate-test-suites --run_under="cd $PWD && ") diff --git a/dev b/dev index 31d5ef8587e0..712627ad5f09 100755 --- a/dev +++ b/dev @@ -4,17 +4,13 @@ set -uo pipefail this_dir=$(cd "$(dirname "$0")" && pwd) mkdir -p $this_dir/artifacts -bazel build //pkg/cmd/dev &> $this_dir/artifacts/dev.log +bazel build //pkg/cmd/dev --config nonogo &> $this_dir/artifacts/dev.log status=$? if [ $status -eq 0 ] then - $(bazel info bazel-bin)/pkg/cmd/dev/dev_/dev "$@" + $(bazel info bazel-bin --config nonogo)/pkg/cmd/dev/dev_/dev "$@" else echo 'Failed to build pkg/cmd/dev! Got output:' cat $this_dir/artifacts/dev.log - echo 'Hint: if the full `dev` build is failing for you, you can build a minimal version with --config nonogo.' - echo 'Afterward, run `dev doctor` to debug your failing build. For example:' - echo ' bazel build pkg/cmd/dev --config nonogo && _bazel/bin/pkg/cmd/dev/dev_/dev doctor' - echo 'When `dev doctor` says you are ready to build, try re-building the full binary with `./dev`.' exit $status fi diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 2ee0a2008567..72a294fd54c9 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "create_scheduled_backup.go", "key_rewriter.go", "manifest_handling.go", + "memory_utils.go", "restoration_data.go", "restore_data_processor.go", "restore_job.go", @@ -110,6 +111,7 @@ go_library( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/metric", + "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/stop", @@ -226,6 +228,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 8cf8c843243c..fe179f68e419 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -115,6 +115,9 @@ type backupDataProcessor struct { cancelAndWaitForWorker func() progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress backupErr error + + // Memory accumulator that reserves the memory usage of the backup processor. + backupMem *memoryAccumulator } var _ execinfra.Processor = &backupDataProcessor{} @@ -127,11 +130,18 @@ func newBackupDataProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { + memMonitor := flowCtx.Cfg.BackupMonitor + if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if knobs.BackupMemMonitor != nil { + memMonitor = knobs.BackupMemMonitor + } + } bp := &backupDataProcessor{ - flowCtx: flowCtx, - spec: spec, - output: output, - progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + flowCtx: flowCtx, + spec: spec, + output: output, + progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + backupMem: newMemoryAccumulator(memMonitor), } if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -160,7 +170,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { TaskName: "backup-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh) + bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem) cancel() close(bp.progCh) }); err != nil { @@ -196,6 +206,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.ProcessorBase.InternalClose() + bp.backupMem.close(bp.Ctx) } // ConsumerClosed is part of the RowSource interface. We have to override the @@ -227,6 +238,7 @@ func runBackupProcessor( flowCtx *execinfra.FlowCtx, spec *execinfrapb.BackupDataSpec, progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + backupMem *memoryAccumulator, ) error { backupProcessorSpan := tracing.SpanFromContext(ctx) clusterSettings := flowCtx.Cfg.Settings @@ -525,7 +537,10 @@ func runBackupProcessor( return err } - sink := &sstSink{conf: sinkConf, dest: storage} + sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem) + if err != nil { + return err + } defer func() { err := sink.Close() @@ -579,6 +594,20 @@ type sstSink struct { sizeFlushes int spanGrows int } + + backupMem *memoryAccumulator +} + +func makeSSTSink( + ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator, +) (*sstSink, error) { + s := &sstSink{conf: conf, dest: dest, backupMem: backupMem} + + // Reserve memory for the file buffer. + if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil { + return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue") + } + return s, nil } func (s *sstSink) Close() error { @@ -589,6 +618,10 @@ func (s *sstSink) Close() error { if s.cancel != nil { s.cancel() } + + // Release the memory reserved for the file buffer back to the memory + // accumulator. + s.backupMem.release(smallFileBuffer.Get(s.conf.settings)) if s.out != nil { return s.out.Close() } @@ -696,6 +729,7 @@ func (s *sstSink) open(ctx context.Context) error { } s.out = w s.sst = storage.MakeBackupSSTWriter(s.out) + return nil } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 92b72ba49317..5c4f57fe885a 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -17,6 +17,7 @@ import ( "hash/crc32" "io" "io/ioutil" + "math" "math/rand" "net/url" "os" @@ -74,6 +75,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -9012,6 +9014,49 @@ CREATE SCHEMA db.s; sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`) } +func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + memoryMonitor := mon.NewMonitor( + "test-mem", + mon.MemoryResource, + nil, /* curCount */ + nil, /* maxHist */ + -1, /* increment */ + math.MaxInt64, /* noteworthy */ + cluster.MakeTestingClusterSettings(), + ) + ctx := context.Background() + byteLimit := 14 << 20 // 14 MiB + memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit))) + defer memoryMonitor.Stop(ctx) + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + BackupMemMonitor: memoryMonitor, + }}, + } + params.ServerArgs.Knobs = knobs + + const numAccounts = 100 + + _, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, + InitManualReplication, params) + defer cleanup() + + // Run a backup and expect the Grow() for the sstSink to return a memory error + // since the default queue byte size is 16MiB. + sqlDB.ExpectErr(t, "failed to reserve memory for sstSink queue", `BACKUP INTO 'nodelocal://0/foo'`) + + // Reduce the queue byte size cluster setting. + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '13MiB'`) + + // Now the backup should succeed because it is below the `byteLimit`. + sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/bar'`) +} + // TestBackupRestoreSeperateIncrementalPrefix tests that a backup/restore round // trip using the 'incremental_storage' parameter restores the same db as a BR // round trip without the parameter. diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index ae2e25f4db37..bbf96d347bca 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -923,6 +923,10 @@ func TestReintroduceOfflineSpans(t *testing.T) { // and not report any progress in the meantime unless it is disabled. srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) + // Test servers only have 128MB root memory monitors, reduce the buffer size + // so we don't see memory errors. + srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`) + // Take a backup that we'll use to create an OFFLINE descriptor. srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`) srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc) diff --git a/pkg/ccl/backupccl/memory_utils.go b/pkg/ccl/backupccl/memory_utils.go new file mode 100644 index 000000000000..276fdda34ce4 --- /dev/null +++ b/pkg/ccl/backupccl/memory_utils.go @@ -0,0 +1,68 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory +// from the bound account when it is closed, otherwise it accumulates and +// re-uses resources. +// This is useful when resources, once accumulated should not be returned as +// they may be needed later to make progress. +// It is safe for concurrent use. +type memoryAccumulator struct { + syncutil.Mutex + ba mon.BoundAccount + reserved int64 +} + +// newMemoryAccumulator creates a new accumulator backed by a bound account created +// from the given memory monitor. +func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator { + return &memoryAccumulator{ba: mm.MakeBoundAccount()} +} + +// request checks that the given number of bytes is available, requesting some +// from the backing monitor if necessary. +func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error { + acc.Lock() + defer acc.Unlock() + + if acc.reserved >= requested { + acc.reserved -= requested + return nil + } + + requested -= acc.reserved + acc.reserved = 0 + + return acc.ba.Grow(ctx, requested) +} + +// release releases a number of bytes back into the internal reserved pool. +func (acc *memoryAccumulator) release(released int64) { + acc.Lock() + defer acc.Unlock() + + acc.reserved += released +} + +// close returns all accumulated memory to the backing monitor. +func (acc *memoryAccumulator) close(ctx context.Context) { + acc.Lock() + defer acc.Unlock() + + acc.reserved = 0 + acc.ba.Close(ctx) +} diff --git a/pkg/ccl/backupccl/testdata/backup-restore/column-families b/pkg/ccl/backupccl/testdata/backup-restore/column-families index bf16406c28d8..bec83fd05b79 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/column-families +++ b/pkg/ccl/backupccl/testdata/backup-restore/column-families @@ -10,6 +10,7 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs; -- Split the output files very small to catch output SSTs mid-row. SET CLUSTER SETTING bulkio.backup.file_size = '1'; SET CLUSTER SETTING kv.bulk_sst.target_size = '1'; +SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'; BACKUP cfs TO 'nodelocal://1/foo'; CREATE DATABASE r1; RESTORE cfs FROM 'nodelocal://1/foo' WITH into_db='r1'; diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 126c615ad653..f745184a7e72 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "//pkg/util", "//pkg/util/bitarray", "//pkg/util/bufalloc", + "//pkg/util/cache", "//pkg/util/ctxgroup", "//pkg/util/duration", "//pkg/util/encoding", diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index e0407c683212..57a3cde2f889 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -35,7 +36,7 @@ import ( type rowFetcherCache struct { codec keys.SQLCodec leaseMgr *lease.Manager - fetchers map[idVersion]*row.Fetcher + fetchers *cache.UnorderedCache collection *descs.Collection db *kv.DB @@ -43,6 +44,14 @@ type rowFetcherCache struct { a rowenc.DatumAlloc } +var rfCacheConfig = cache.Config{ + Policy: cache.CacheFIFO, + // TODO: If we find ourselves thrashing here in changefeeds on many tables, + // we can improve performance by eagerly evicting versions using Resolved notifications. + // A old version with a timestamp entirely before a notification can be safely evicted. + ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 }, +} + type idVersion struct { id descpb.ID version descpb.DescriptorVersion @@ -60,7 +69,7 @@ func newRowFetcherCache( leaseMgr: leaseMgr, collection: cf.NewCollection(nil /* TemporarySchemaProvider */), db: db, - fetchers: make(map[idVersion]*row.Fetcher), + fetchers: cache.NewUnorderedCache(rfCacheConfig), } } @@ -138,10 +147,13 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( // UserDefinedTypeColsHaveSameVersion if we have a hit because we are // guaranteed that the tables have the same version. Additionally, these // fetchers are always initialized with a single tabledesc.Immutable. - if rf, ok := c.fetchers[idVer]; ok && - catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) { - return rf, nil + if v, ok := c.fetchers.Get(idVer); ok { + rf := v.(*row.Fetcher) + if catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) { + return rf, nil + } } + // TODO(dan): Allow for decoding a subset of the columns. var colIdxMap catalog.TableColMap var valNeededForCol util.FastIntSet @@ -177,9 +189,6 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( // Necessary because virtual columns are not populated. rf.IgnoreUnexpectedNulls = true - // TODO(dan): Bound the size of the cache. Resolved notifications will let - // us evict anything for timestamps entirely before the notification. Then - // probably an LRU just in case? - c.fetchers[idVer] = &rf + c.fetchers.Add(idVer, &rf) return &rf, nil } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index aeb820d71111..112608642430 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -446,6 +446,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{}) backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon") + backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon") serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit( "server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor, @@ -542,6 +543,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", colexec.VecMaxOpenFDsLimit)), ParentDiskMonitor: cfg.TempStorageConfig.Mon, BackfillerMonitor: backfillMemoryMonitor, + BackupMonitor: backupMemoryMonitor, ParentMemoryMonitor: rootSQLMemoryMonitor, BulkAdder: func( diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index e31bc32f6155..e1156fab5de3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1372,6 +1372,11 @@ type BackupRestoreTestingKnobs struct { // RunAfterExportingSpanEntry allows blocking the BACKUP job after a single // span has been exported. RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse) + + // BackupMonitor is used to overwrite the monitor used by backup during + // testing. This is typically the bulk mem monitor if not + // specified here. + BackupMemMonitor *mon.BytesMonitor } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 75feb6fc9503..9eeda739e7d6 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -101,6 +101,10 @@ type ServerConfig struct { // used by the column and index backfillers. BackfillerMonitor *mon.BytesMonitor + // Child monitor of the bulk monitor which will be used to monitor the memory + // used during backup. + BackupMonitor *mon.BytesMonitor + // ParentDiskMonitor is normally the root disk monitor. It should only be used // when setting up a server, a child monitor (usually belonging to a sql // execution flow), or in tests. It is used to monitor temporary storage disk