From 1ac3e937e9bd9e6fde4f989b5f7b034b01ee143e Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Tue, 14 Dec 2021 14:02:50 -0500 Subject: [PATCH] backupccl: add BackupMonitor to memory monitor file stitching This change adds a BackupMonitor that hangs off the bulk memory monitor. This monitor currently only guards the queue that we use to buffer SSTs while stitching them together in the sstSink. Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 3 + pkg/ccl/backupccl/backup_processor.go | 46 +++++++++++-- pkg/ccl/backupccl/backup_test.go | 45 ++++++++++++ .../full_cluster_backup_restore_test.go | 4 ++ pkg/ccl/backupccl/memory_utils.go | 68 +++++++++++++++++++ .../testdata/backup-restore/column-families | 1 + pkg/server/server_sql.go | 2 + pkg/sql/exec_util.go | 5 ++ pkg/sql/execinfra/server_config.go | 4 ++ 9 files changed, 172 insertions(+), 6 deletions(-) create mode 100644 pkg/ccl/backupccl/memory_utils.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 2ee0a2008567..72a294fd54c9 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "create_scheduled_backup.go", "key_rewriter.go", "manifest_handling.go", + "memory_utils.go", "restoration_data.go", "restore_data_processor.go", "restore_job.go", @@ -110,6 +111,7 @@ go_library( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/metric", + "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/stop", @@ -226,6 +228,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 8cf8c843243c..fe179f68e419 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -115,6 +115,9 @@ type backupDataProcessor struct { cancelAndWaitForWorker func() progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress backupErr error + + // Memory accumulator that reserves the memory usage of the backup processor. + backupMem *memoryAccumulator } var _ execinfra.Processor = &backupDataProcessor{} @@ -127,11 +130,18 @@ func newBackupDataProcessor( post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { + memMonitor := flowCtx.Cfg.BackupMonitor + if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if knobs.BackupMemMonitor != nil { + memMonitor = knobs.BackupMemMonitor + } + } bp := &backupDataProcessor{ - flowCtx: flowCtx, - spec: spec, - output: output, - progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + flowCtx: flowCtx, + spec: spec, + output: output, + progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + backupMem: newMemoryAccumulator(memMonitor), } if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -160,7 +170,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { TaskName: "backup-worker", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { - bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh) + bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem) cancel() close(bp.progCh) }); err != nil { @@ -196,6 +206,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.ProcessorBase.InternalClose() + bp.backupMem.close(bp.Ctx) } // ConsumerClosed is part of the RowSource interface. We have to override the @@ -227,6 +238,7 @@ func runBackupProcessor( flowCtx *execinfra.FlowCtx, spec *execinfrapb.BackupDataSpec, progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + backupMem *memoryAccumulator, ) error { backupProcessorSpan := tracing.SpanFromContext(ctx) clusterSettings := flowCtx.Cfg.Settings @@ -525,7 +537,10 @@ func runBackupProcessor( return err } - sink := &sstSink{conf: sinkConf, dest: storage} + sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem) + if err != nil { + return err + } defer func() { err := sink.Close() @@ -579,6 +594,20 @@ type sstSink struct { sizeFlushes int spanGrows int } + + backupMem *memoryAccumulator +} + +func makeSSTSink( + ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator, +) (*sstSink, error) { + s := &sstSink{conf: conf, dest: dest, backupMem: backupMem} + + // Reserve memory for the file buffer. + if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil { + return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue") + } + return s, nil } func (s *sstSink) Close() error { @@ -589,6 +618,10 @@ func (s *sstSink) Close() error { if s.cancel != nil { s.cancel() } + + // Release the memory reserved for the file buffer back to the memory + // accumulator. + s.backupMem.release(smallFileBuffer.Get(s.conf.settings)) if s.out != nil { return s.out.Close() } @@ -696,6 +729,7 @@ func (s *sstSink) open(ctx context.Context) error { } s.out = w s.sst = storage.MakeBackupSSTWriter(s.out) + return nil } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 92b72ba49317..5c4f57fe885a 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -17,6 +17,7 @@ import ( "hash/crc32" "io" "io/ioutil" + "math" "math/rand" "net/url" "os" @@ -74,6 +75,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -9012,6 +9014,49 @@ CREATE SCHEMA db.s; sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`) } +func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + memoryMonitor := mon.NewMonitor( + "test-mem", + mon.MemoryResource, + nil, /* curCount */ + nil, /* maxHist */ + -1, /* increment */ + math.MaxInt64, /* noteworthy */ + cluster.MakeTestingClusterSettings(), + ) + ctx := context.Background() + byteLimit := 14 << 20 // 14 MiB + memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit))) + defer memoryMonitor.Stop(ctx) + params := base.TestClusterArgs{} + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + BackupMemMonitor: memoryMonitor, + }}, + } + params.ServerArgs.Knobs = knobs + + const numAccounts = 100 + + _, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, + InitManualReplication, params) + defer cleanup() + + // Run a backup and expect the Grow() for the sstSink to return a memory error + // since the default queue byte size is 16MiB. + sqlDB.ExpectErr(t, "failed to reserve memory for sstSink queue", `BACKUP INTO 'nodelocal://0/foo'`) + + // Reduce the queue byte size cluster setting. + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '13MiB'`) + + // Now the backup should succeed because it is below the `byteLimit`. + sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/bar'`) +} + // TestBackupRestoreSeperateIncrementalPrefix tests that a backup/restore round // trip using the 'incremental_storage' parameter restores the same db as a BR // round trip without the parameter. diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index ae2e25f4db37..bbf96d347bca 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -923,6 +923,10 @@ func TestReintroduceOfflineSpans(t *testing.T) { // and not report any progress in the meantime unless it is disabled. srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) + // Test servers only have 128MB root memory monitors, reduce the buffer size + // so we don't see memory errors. + srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`) + // Take a backup that we'll use to create an OFFLINE descriptor. srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`) srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc) diff --git a/pkg/ccl/backupccl/memory_utils.go b/pkg/ccl/backupccl/memory_utils.go new file mode 100644 index 000000000000..276fdda34ce4 --- /dev/null +++ b/pkg/ccl/backupccl/memory_utils.go @@ -0,0 +1,68 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory +// from the bound account when it is closed, otherwise it accumulates and +// re-uses resources. +// This is useful when resources, once accumulated should not be returned as +// they may be needed later to make progress. +// It is safe for concurrent use. +type memoryAccumulator struct { + syncutil.Mutex + ba mon.BoundAccount + reserved int64 +} + +// newMemoryAccumulator creates a new accumulator backed by a bound account created +// from the given memory monitor. +func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator { + return &memoryAccumulator{ba: mm.MakeBoundAccount()} +} + +// request checks that the given number of bytes is available, requesting some +// from the backing monitor if necessary. +func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error { + acc.Lock() + defer acc.Unlock() + + if acc.reserved >= requested { + acc.reserved -= requested + return nil + } + + requested -= acc.reserved + acc.reserved = 0 + + return acc.ba.Grow(ctx, requested) +} + +// release releases a number of bytes back into the internal reserved pool. +func (acc *memoryAccumulator) release(released int64) { + acc.Lock() + defer acc.Unlock() + + acc.reserved += released +} + +// close returns all accumulated memory to the backing monitor. +func (acc *memoryAccumulator) close(ctx context.Context) { + acc.Lock() + defer acc.Unlock() + + acc.reserved = 0 + acc.ba.Close(ctx) +} diff --git a/pkg/ccl/backupccl/testdata/backup-restore/column-families b/pkg/ccl/backupccl/testdata/backup-restore/column-families index bf16406c28d8..bec83fd05b79 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/column-families +++ b/pkg/ccl/backupccl/testdata/backup-restore/column-families @@ -10,6 +10,7 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs; -- Split the output files very small to catch output SSTs mid-row. SET CLUSTER SETTING bulkio.backup.file_size = '1'; SET CLUSTER SETTING kv.bulk_sst.target_size = '1'; +SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'; BACKUP cfs TO 'nodelocal://1/foo'; CREATE DATABASE r1; RESTORE cfs FROM 'nodelocal://1/foo' WITH into_db='r1'; diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index aeb820d71111..112608642430 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -446,6 +446,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{}) backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon") + backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon") serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit( "server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor, @@ -542,6 +543,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", colexec.VecMaxOpenFDsLimit)), ParentDiskMonitor: cfg.TempStorageConfig.Mon, BackfillerMonitor: backfillMemoryMonitor, + BackupMonitor: backupMemoryMonitor, ParentMemoryMonitor: rootSQLMemoryMonitor, BulkAdder: func( diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index e31bc32f6155..e1156fab5de3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1372,6 +1372,11 @@ type BackupRestoreTestingKnobs struct { // RunAfterExportingSpanEntry allows blocking the BACKUP job after a single // span has been exported. RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse) + + // BackupMonitor is used to overwrite the monitor used by backup during + // testing. This is typically the bulk mem monitor if not + // specified here. + BackupMemMonitor *mon.BytesMonitor } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 75feb6fc9503..9eeda739e7d6 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -101,6 +101,10 @@ type ServerConfig struct { // used by the column and index backfillers. BackfillerMonitor *mon.BytesMonitor + // Child monitor of the bulk monitor which will be used to monitor the memory + // used during backup. + BackupMonitor *mon.BytesMonitor + // ParentDiskMonitor is normally the root disk monitor. It should only be used // when setting up a server, a child monitor (usually belonging to a sql // execution flow), or in tests. It is used to monitor temporary storage disk