From 08542b210fee0dcbbdce49c09c04dfe76b692bc3 Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Mon, 13 Dec 2021 13:21:26 -0500 Subject: [PATCH 1/4] changefeedccl: limit in-memory rowfetcher cache We generate a row.Fetcher for each new version of a watched table, and cache it in-memory. This is technically an unbounded memory usage and thus could cause an OOM, although in practice you won't hit that unless you are Schema Changes Georg, the hypothetical outlier who makes thousands of schema changes a second. To support Schema Changes Georg, this PR bounds the size of the cache and evicts the oldest record. There's smarter stuff we could do (per an existing TODO) to eagerly evict records using resolved timestamps, but that's an optimization only a small subset of hypothetical Georgs would care about (those running single changefeeds on many tables, all with many rapid schema changes) so I doubt we'll want to bother. Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/rowfetcher_cache.go | 27 +++++++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 126c615ad653..f745184a7e72 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "//pkg/util", "//pkg/util/bitarray", "//pkg/util/bufalloc", + "//pkg/util/cache", "//pkg/util/ctxgroup", "//pkg/util/duration", "//pkg/util/encoding", diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index e0407c683212..57a3cde2f889 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -35,7 +36,7 @@ import ( type rowFetcherCache struct { codec keys.SQLCodec leaseMgr *lease.Manager - fetchers map[idVersion]*row.Fetcher + fetchers *cache.UnorderedCache collection *descs.Collection db *kv.DB @@ -43,6 +44,14 @@ type rowFetcherCache struct { a rowenc.DatumAlloc } +var rfCacheConfig = cache.Config{ + Policy: cache.CacheFIFO, + // TODO: If we find ourselves thrashing here in changefeeds on many tables, + // we can improve performance by eagerly evicting versions using Resolved notifications. + // A old version with a timestamp entirely before a notification can be safely evicted. + ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 }, +} + type idVersion struct { id descpb.ID version descpb.DescriptorVersion @@ -60,7 +69,7 @@ func newRowFetcherCache( leaseMgr: leaseMgr, collection: cf.NewCollection(nil /* TemporarySchemaProvider */), db: db, - fetchers: make(map[idVersion]*row.Fetcher), + fetchers: cache.NewUnorderedCache(rfCacheConfig), } } @@ -138,10 +147,13 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( // UserDefinedTypeColsHaveSameVersion if we have a hit because we are // guaranteed that the tables have the same version. Additionally, these // fetchers are always initialized with a single tabledesc.Immutable. - if rf, ok := c.fetchers[idVer]; ok && - catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) { - return rf, nil + if v, ok := c.fetchers.Get(idVer); ok { + rf := v.(*row.Fetcher) + if catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) { + return rf, nil + } } + // TODO(dan): Allow for decoding a subset of the columns. var colIdxMap catalog.TableColMap var valNeededForCol util.FastIntSet @@ -177,9 +189,6 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( // Necessary because virtual columns are not populated. rf.IgnoreUnexpectedNulls = true - // TODO(dan): Bound the size of the cache. Resolved notifications will let - // us evict anything for timestamps entirely before the notification. Then - // probably an LRU just in case? - c.fetchers[idVer] = &rf + c.fetchers.Add(idVer, &rf) return &rf, nil } From 1ac3e937e9bd9e6fde4f989b5f7b034b01ee143e Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Tue, 14 Dec 2021 14:02:50 -0500 Subject: [PATCH 2/4] backupccl: add BackupMonitor to memory monitor file stitching This change adds a BackupMonitor that hangs off the bulk memory monitor. This monitor currently only guards the queue that we use to buffer SSTs while stitching them together in the sstSink. Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 3 + pkg/ccl/backupccl/backup_processor.go | 46 +++++++++++-- pkg/ccl/backupccl/backup_test.go | 45 ++++++++++++ .../full_cluster_backup_restore_test.go | 4 ++ pkg/ccl/backupccl/memory_utils.go | 68 +++++++++++++++++++ .../testdata/backup-restore/column-families | 1 + pkg/server/server_sql.go | 2 + pkg/sql/exec_util.go | 5 ++ pkg/sql/execinfra/server_config.go | 4 ++ 9 files changed, 172 insertions(+), 6 deletions(-) create mode 100644 pkg/ccl/backupccl/memory_utils.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 2ee0a2008567..72a294fd54c9 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "create_scheduled_backup.go", "key_rewriter.go", "manifest_handling.go", + "memory_utils.go", "restoration_data.go", "restore_data_processor.go", "restore_job.go", @@ -110,6 +111,7 @@ go_library( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/metric", + "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/stop", @@ -226,6 +228,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 8cf8c843243c..fe179f68e419 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -115,6 +115,9 @@ type backupDataProcessor struct { cancelAndWaitForWorker func() progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress backupErr error + + // Memory accumulator that reserves the memory usage of the backup processor. + backupMem *memoryAccumulator } var _ execinfra.Processor = &backupDataProcessor{} @@ -127,11 +130,18 @@ func newBackupDataProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { + memMonitor := flowCtx.Cfg.BackupMonitor + if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if knobs.BackupMemMonitor != nil { + memMonitor = knobs.BackupMemMonitor + } + } bp := &backupDataProcessor{ - flowCtx: flowCtx, - spec: spec, - output: output, - progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + flowCtx: flowCtx, + spec: spec, + output: output, + progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + backupMem: newMemoryAccumulator(memMonitor), } if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -160,7 +170,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { TaskName: "backup-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh) + bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem) cancel() close(bp.progCh) }); err != nil { @@ -196,6 +206,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.ProcessorBase.InternalClose() + bp.backupMem.close(bp.Ctx) } // ConsumerClosed is part of the RowSource interface. We have to override the @@ -227,6 +238,7 @@ func runBackupProcessor( flowCtx *execinfra.FlowCtx, spec *execinfrapb.BackupDataSpec, progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + backupMem *memoryAccumulator, ) error { backupProcessorSpan := tracing.SpanFromContext(ctx) clusterSettings := flowCtx.Cfg.Settings @@ -525,7 +537,10 @@ func runBackupProcessor( return err } - sink := &sstSink{conf: sinkConf, dest: storage} + sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem) + if err != nil { + return err + } defer func() { err := sink.Close() @@ -579,6 +594,20 @@ type sstSink struct { sizeFlushes int spanGrows int } + + backupMem *memoryAccumulator +} + +func makeSSTSink( + ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator, +) (*sstSink, error) { + s := &sstSink{conf: conf, dest: dest, backupMem: backupMem} + + // Reserve memory for the file buffer. + if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil { + return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue") + } + return s, nil } func (s *sstSink) Close() error { @@ -589,6 +618,10 @@ func (s *sstSink) Close() error { if s.cancel != nil { s.cancel() } + + // Release the memory reserved for the file buffer back to the memory + // accumulator. + s.backupMem.release(smallFileBuffer.Get(s.conf.settings)) if s.out != nil { return s.out.Close() } @@ -696,6 +729,7 @@ func (s *sstSink) open(ctx context.Context) error { } s.out = w s.sst = storage.MakeBackupSSTWriter(s.out) + return nil } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 92b72ba49317..5c4f57fe885a 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -17,6 +17,7 @@ import ( "hash/crc32" "io" "io/ioutil" + "math" "math/rand" "net/url" "os" @@ -74,6 +75,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -9012,6 +9014,49 @@ CREATE SCHEMA db.s; sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`) } +func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + memoryMonitor := mon.NewMonitor( + "test-mem", + mon.MemoryResource, + nil, /* curCount */ + nil, /* maxHist */ + -1, /* increment */ + math.MaxInt64, /* noteworthy */ + cluster.MakeTestingClusterSettings(), + ) + ctx := context.Background() + byteLimit := 14 << 20 // 14 MiB + memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit))) + defer memoryMonitor.Stop(ctx) + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + BackupMemMonitor: memoryMonitor, + }}, + } + params.ServerArgs.Knobs = knobs + + const numAccounts = 100 + + _, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, + InitManualReplication, params) + defer cleanup() + + // Run a backup and expect the Grow() for the sstSink to return a memory error + // since the default queue byte size is 16MiB. + sqlDB.ExpectErr(t, "failed to reserve memory for sstSink queue", `BACKUP INTO 'nodelocal://0/foo'`) + + // Reduce the queue byte size cluster setting. + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '13MiB'`) + + // Now the backup should succeed because it is below the `byteLimit`. + sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/bar'`) +} + // TestBackupRestoreSeperateIncrementalPrefix tests that a backup/restore round // trip using the 'incremental_storage' parameter restores the same db as a BR // round trip without the parameter. diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index ae2e25f4db37..bbf96d347bca 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -923,6 +923,10 @@ func TestReintroduceOfflineSpans(t *testing.T) { // and not report any progress in the meantime unless it is disabled. srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) + // Test servers only have 128MB root memory monitors, reduce the buffer size + // so we don't see memory errors. + srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`) + // Take a backup that we'll use to create an OFFLINE descriptor. srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`) srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc) diff --git a/pkg/ccl/backupccl/memory_utils.go b/pkg/ccl/backupccl/memory_utils.go new file mode 100644 index 000000000000..276fdda34ce4 --- /dev/null +++ b/pkg/ccl/backupccl/memory_utils.go @@ -0,0 +1,68 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory +// from the bound account when it is closed, otherwise it accumulates and +// re-uses resources. +// This is useful when resources, once accumulated should not be returned as +// they may be needed later to make progress. +// It is safe for concurrent use. +type memoryAccumulator struct { + syncutil.Mutex + ba mon.BoundAccount + reserved int64 +} + +// newMemoryAccumulator creates a new accumulator backed by a bound account created +// from the given memory monitor. +func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator { + return &memoryAccumulator{ba: mm.MakeBoundAccount()} +} + +// request checks that the given number of bytes is available, requesting some +// from the backing monitor if necessary. +func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error { + acc.Lock() + defer acc.Unlock() + + if acc.reserved >= requested { + acc.reserved -= requested + return nil + } + + requested -= acc.reserved + acc.reserved = 0 + + return acc.ba.Grow(ctx, requested) +} + +// release releases a number of bytes back into the internal reserved pool. +func (acc *memoryAccumulator) release(released int64) { + acc.Lock() + defer acc.Unlock() + + acc.reserved += released +} + +// close returns all accumulated memory to the backing monitor. +func (acc *memoryAccumulator) close(ctx context.Context) { + acc.Lock() + defer acc.Unlock() + + acc.reserved = 0 + acc.ba.Close(ctx) +} diff --git a/pkg/ccl/backupccl/testdata/backup-restore/column-families b/pkg/ccl/backupccl/testdata/backup-restore/column-families index bf16406c28d8..bec83fd05b79 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/column-families +++ b/pkg/ccl/backupccl/testdata/backup-restore/column-families @@ -10,6 +10,7 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs; -- Split the output files very small to catch output SSTs mid-row. SET CLUSTER SETTING bulkio.backup.file_size = '1'; SET CLUSTER SETTING kv.bulk_sst.target_size = '1'; +SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'; BACKUP cfs TO 'nodelocal://1/foo'; CREATE DATABASE r1; RESTORE cfs FROM 'nodelocal://1/foo' WITH into_db='r1'; diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index aeb820d71111..112608642430 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -446,6 +446,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{}) backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon") + backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon") serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit( "server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor, @@ -542,6 +543,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", colexec.VecMaxOpenFDsLimit)), ParentDiskMonitor: cfg.TempStorageConfig.Mon, BackfillerMonitor: backfillMemoryMonitor, + BackupMonitor: backupMemoryMonitor, ParentMemoryMonitor: rootSQLMemoryMonitor, BulkAdder: func( diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index e31bc32f6155..e1156fab5de3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1372,6 +1372,11 @@ type BackupRestoreTestingKnobs struct { // RunAfterExportingSpanEntry allows blocking the BACKUP job after a single // span has been exported. RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse) + + // BackupMonitor is used to overwrite the monitor used by backup during + // testing. This is typically the bulk mem monitor if not + // specified here. + BackupMemMonitor *mon.BytesMonitor } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 75feb6fc9503..9eeda739e7d6 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -101,6 +101,10 @@ type ServerConfig struct { // used by the column and index backfillers. BackfillerMonitor *mon.BytesMonitor + // Child monitor of the bulk monitor which will be used to monitor the memory + // used during backup. + BackupMonitor *mon.BytesMonitor + // ParentDiskMonitor is normally the root disk monitor. It should only be used // when setting up a server, a child monitor (usually belonging to a sql // execution flow), or in tests. It is used to monitor temporary storage disk From 3e3f53605ab50e2c69a78566f22af40c2d810769 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Mon, 20 Dec 2021 11:09:44 -0600 Subject: [PATCH 3/4] bazel: pass path argument to `find` The bare `find -name ...` invocation without a path argument fails on some versions of `find`, namely the one installed on macOS machines. Release note: None --- build/bazelutil/bazel-generate.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/bazelutil/bazel-generate.sh b/build/bazelutil/bazel-generate.sh index 24f572bae8a6..a5fede3df6a0 100755 --- a/build/bazelutil/bazel-generate.sh +++ b/build/bazelutil/bazel-generate.sh @@ -53,7 +53,7 @@ fi bazel run //:gazelle -if files_unchanged_from_upstream $(find -name BUILD.bazel) $(find -name '*.bzl'); then +if files_unchanged_from_upstream $(find ./pkg -name BUILD.bazel) $(find ./pkg -name '*.bzl'); then echo "Skipping //pkg/cmd/generate-test-suites (relevant files are unchanged from upstream)." else CONTENTS=$(bazel run //pkg/cmd/generate-test-suites --run_under="cd $PWD && ") From b3db2553cbcf7649714d6594549ece063dd137bd Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Mon, 20 Dec 2021 12:05:44 -0600 Subject: [PATCH 4/4] bazel: build `dev` with `--config nonogo` I didn't do this unconditionally because I was afraid it would thrash the cache, but in my testing it seems to not really happen (at least, compiling `dev` with `nonogo` is so fast that it vastly outweighs any invalidation that may be happening). Closes #74004. Release note: None --- dev | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dev b/dev index 31d5ef8587e0..712627ad5f09 100755 --- a/dev +++ b/dev @@ -4,17 +4,13 @@ set -uo pipefail this_dir=$(cd "$(dirname "$0")" && pwd) mkdir -p $this_dir/artifacts -bazel build //pkg/cmd/dev &> $this_dir/artifacts/dev.log +bazel build //pkg/cmd/dev --config nonogo &> $this_dir/artifacts/dev.log status=$? if [ $status -eq 0 ] then - $(bazel info bazel-bin)/pkg/cmd/dev/dev_/dev "$@" + $(bazel info bazel-bin --config nonogo)/pkg/cmd/dev/dev_/dev "$@" else echo 'Failed to build pkg/cmd/dev! Got output:' cat $this_dir/artifacts/dev.log - echo 'Hint: if the full `dev` build is failing for you, you can build a minimal version with --config nonogo.' - echo 'Afterward, run `dev doctor` to debug your failing build. For example:' - echo ' bazel build pkg/cmd/dev --config nonogo && _bazel/bin/pkg/cmd/dev/dev_/dev doctor' - echo 'When `dev doctor` says you are ready to build, try re-building the full binary with `./dev`.' exit $status fi