Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: add BackupMonitor to memory monitor file stitching #73805

Merged
merged 1 commit into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"create_scheduled_backup.go",
"key_rewriter.go",
"manifest_handling.go",
"memory_utils.go",
"restoration_data.go",
"restore_data_processor.go",
"restore_job.go",
Expand Down Expand Up @@ -110,6 +111,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/stop",
Expand Down Expand Up @@ -226,6 +228,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
46 changes: 40 additions & 6 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ type backupDataProcessor struct {
cancelAndWaitForWorker func()
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error

// Memory accumulator that reserves the memory usage of the backup processor.
backupMem *memoryAccumulator
}

var _ execinfra.Processor = &backupDataProcessor{}
Expand All @@ -127,11 +130,18 @@ func newBackupDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
memMonitor := flowCtx.Cfg.BackupMonitor
if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if knobs.BackupMemMonitor != nil {
memMonitor = knobs.BackupMemMonitor
}
}
bp := &backupDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
backupMem: newMemoryAccumulator(memMonitor),
}
if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -160,7 +170,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
TaskName: "backup-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem)
cancel()
close(bp.progCh)
}); err != nil {
Expand Down Expand Up @@ -196,6 +206,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.ProcessorBase.InternalClose()
bp.backupMem.close(bp.Ctx)
}

// ConsumerClosed is part of the RowSource interface. We have to override the
Expand Down Expand Up @@ -227,6 +238,7 @@ func runBackupProcessor(
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.BackupDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupMem *memoryAccumulator,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings
Expand Down Expand Up @@ -525,7 +537,10 @@ func runBackupProcessor(
return err
}

sink := &sstSink{conf: sinkConf, dest: storage}
sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem)
if err != nil {
return err
}

defer func() {
err := sink.Close()
Expand Down Expand Up @@ -579,6 +594,20 @@ type sstSink struct {
sizeFlushes int
spanGrows int
}

backupMem *memoryAccumulator
}

func makeSSTSink(
ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator,
) (*sstSink, error) {
s := &sstSink{conf: conf, dest: dest, backupMem: backupMem}

// Reserve memory for the file buffer.
if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil {
return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue")
}
return s, nil
}

func (s *sstSink) Close() error {
Expand All @@ -589,6 +618,10 @@ func (s *sstSink) Close() error {
if s.cancel != nil {
s.cancel()
}

// Release the memory reserved for the file buffer back to the memory
// accumulator.
s.backupMem.release(smallFileBuffer.Get(s.conf.settings))
if s.out != nil {
return s.out.Close()
}
Expand Down Expand Up @@ -696,6 +729,7 @@ func (s *sstSink) open(ctx context.Context) error {
}
s.out = w
s.sst = storage.MakeBackupSSTWriter(s.out)

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"math"
"math/rand"
"net/url"
"os"
Expand Down Expand Up @@ -74,6 +75,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -9012,6 +9014,49 @@ CREATE SCHEMA db.s;
sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`)
}

func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

memoryMonitor := mon.NewMonitor(
"test-mem",
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment */
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
ctx := context.Background()
byteLimit := 14 << 20 // 14 MiB
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit)))
defer memoryMonitor.Stop(ctx)
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
BackupMemMonitor: memoryMonitor,
}},
}
params.ServerArgs.Knobs = knobs

const numAccounts = 100

_, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts,
InitManualReplication, params)
defer cleanup()

// Run a backup and expect the Grow() for the sstSink to return a memory error
// since the default queue byte size is 16MiB.
sqlDB.ExpectErr(t, "failed to reserve memory for sstSink queue", `BACKUP INTO 'nodelocal://0/foo'`)

// Reduce the queue byte size cluster setting.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '13MiB'`)

// Now the backup should succeed because it is below the `byteLimit`.
sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/bar'`)
}

// TestBackupRestoreSeperateIncrementalPrefix tests that a backup/restore round
// trip using the 'incremental_storage' parameter restores the same db as a BR
// round trip without the parameter.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
// and not report any progress in the meantime unless it is disabled.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)

// Test servers only have 128MB root memory monitors, reduce the buffer size
// so we don't see memory errors.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`)

// Take a backup that we'll use to create an OFFLINE descriptor.
srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`)
srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc)
Expand Down
68 changes: 68 additions & 0 deletions pkg/ccl/backupccl/memory_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory
// from the bound account when it is closed, otherwise it accumulates and
// re-uses resources.
// This is useful when resources, once accumulated should not be returned as
// they may be needed later to make progress.
// It is safe for concurrent use.
type memoryAccumulator struct {
syncutil.Mutex
ba mon.BoundAccount
reserved int64
}

// newMemoryAccumulator creates a new accumulator backed by a bound account created
// from the given memory monitor.
func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator {
return &memoryAccumulator{ba: mm.MakeBoundAccount()}
}

// request checks that the given number of bytes is available, requesting some
// from the backing monitor if necessary.
func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error {
acc.Lock()
defer acc.Unlock()

if acc.reserved >= requested {
acc.reserved -= requested
return nil
}

dt marked this conversation as resolved.
Show resolved Hide resolved
requested -= acc.reserved
acc.reserved = 0

return acc.ba.Grow(ctx, requested)
}

// release releases a number of bytes back into the internal reserved pool.
func (acc *memoryAccumulator) release(released int64) {
acc.Lock()
defer acc.Unlock()

acc.reserved += released
}

// close returns all accumulated memory to the backing monitor.
func (acc *memoryAccumulator) close(ctx context.Context) {
acc.Lock()
defer acc.Unlock()

acc.reserved = 0
acc.ba.Close(ctx)
}
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/column-families
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs;
-- Split the output files very small to catch output SSTs mid-row.
SET CLUSTER SETTING bulkio.backup.file_size = '1';
SET CLUSTER SETTING kv.bulk_sst.target_size = '1';
SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB';
BACKUP cfs TO 'nodelocal://1/foo';
CREATE DATABASE r1;
RESTORE cfs FROM 'nodelocal://1/foo' WITH into_db='r1';
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{})

backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon")
backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon")

serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit(
"server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor,
Expand Down Expand Up @@ -542,6 +543,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", colexec.VecMaxOpenFDsLimit)),
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
BackupMonitor: backupMemoryMonitor,

ParentMemoryMonitor: rootSQLMemoryMonitor,
BulkAdder: func(
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,11 @@ type BackupRestoreTestingKnobs struct {
// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
// span has been exported.
RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse)

// BackupMonitor is used to overwrite the monitor used by backup during
// testing. This is typically the bulk mem monitor if not
// specified here.
BackupMemMonitor *mon.BytesMonitor
}

var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ type ServerConfig struct {
// used by the column and index backfillers.
BackfillerMonitor *mon.BytesMonitor

// Child monitor of the bulk monitor which will be used to monitor the memory
// used during backup.
BackupMonitor *mon.BytesMonitor

// ParentDiskMonitor is normally the root disk monitor. It should only be used
// when setting up a server, a child monitor (usually belonging to a sql
// execution flow), or in tests. It is used to monitor temporary storage disk
Expand Down