Skip to content

Commit

Permalink
backupccl: add BackupMonitor to memory monitor file stitching
Browse files Browse the repository at this point in the history
This change adds a BackupMonitor that hangs off the bulk memory
monitor. This monitor currently only guards the queue that we use
to buffer SSTs while stitching them together in the sstSink.

Release note: None
  • Loading branch information
adityamaru committed Dec 15, 2021
1 parent 72d55ea commit 65c5ab6
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"create_scheduled_backup.go",
"key_rewriter.go",
"manifest_handling.go",
"memory_utils.go",
"restoration_data.go",
"restore_data_processor.go",
"restore_job.go",
Expand Down Expand Up @@ -110,6 +111,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/stop",
Expand Down Expand Up @@ -223,6 +225,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
37 changes: 31 additions & 6 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type backupDataProcessor struct {
cancelAndWaitForWorker func()
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error

// Memory accumulator that reserves the memory usage of the backup processor.
backupMem *memoryAccumulator
}

var _ execinfra.Processor = &backupDataProcessor{}
Expand All @@ -126,11 +129,18 @@ func newBackupDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
memMonitor := flowCtx.Cfg.BackupMonitor
if knobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if knobs.BackupMemMonitor != nil {
memMonitor = knobs.BackupMemMonitor
}
}
bp := &backupDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
backupMem: newMemoryAccumulator(memMonitor),
}
if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -159,7 +169,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
TaskName: "backup-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem)
cancel()
close(bp.progCh)
}); err != nil {
Expand Down Expand Up @@ -195,6 +205,9 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.ProcessorBase.InternalClose()
if bp.backupMem != nil {
bp.backupMem.close(bp.Ctx)
}
}

// ConsumerClosed is part of the RowSource interface. We have to override the
Expand Down Expand Up @@ -226,6 +239,7 @@ func runBackupProcessor(
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.BackupDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupMem *memoryAccumulator,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings
Expand Down Expand Up @@ -524,7 +538,7 @@ func runBackupProcessor(
return err
}

sink := &sstSink{conf: sinkConf, dest: storage}
sink := &sstSink{conf: sinkConf, dest: storage, backupMem: backupMem}

defer func() {
err := sink.Close()
Expand Down Expand Up @@ -578,6 +592,8 @@ type sstSink struct {
sizeFlushes int
spanGrows int
}

backupMem *memoryAccumulator
}

func (s *sstSink) Close() error {
Expand Down Expand Up @@ -612,16 +628,19 @@ func (s *sstSink) push(ctx context.Context, resp returnedSST) error {
if drain < 1 {
drain = 1
}
var drainedByteSize int
for i := range s.queue[:drain] {
if err := s.write(ctx, s.queue[i]); err != nil {
return err
}
s.queueSize -= len(s.queue[i].sst)
drainedByteSize += len(s.queue[i].sst)
}

// Shift down the remainder of the queue and slice off the tail.
copy(s.queue, s.queue[drain:])
s.queue = s.queue[:len(s.queue)-drain]
s.backupMem.release(int64(drainedByteSize))
}
return nil
}
Expand Down Expand Up @@ -694,6 +713,12 @@ func (s *sstSink) open(ctx context.Context) error {
}
s.out = w
s.sst = storage.MakeBackupSSTWriter(s.out)

// Reserve the queue that will be used to buffer writes before flushing.
if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil {
return errors.Wrap(err, "failed to reserve memory for sstSink queue")
}

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"math"
"math/rand"
"net/url"
"os"
Expand Down Expand Up @@ -74,6 +75,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -9012,3 +9014,46 @@ CREATE SCHEMA db.s;

sqlDB.Exec(t, `BACKUP DATABASE db TO 'nodelocal://0/test/2'`)
}

func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

memoryMonitor := mon.NewMonitor(
"test-mem",
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment */
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
ctx := context.Background()
byteLimit := MultiNode * (14 << 20) // 14 MiB
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit)))
defer memoryMonitor.Stop(ctx)
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
BackupMemMonitor: memoryMonitor,
}},
}
params.ServerArgs.Knobs = knobs

const numAccounts = 100

_, _, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts,
InitManualReplication, params)
defer cleanup()

// Run a backup and expect the Grow() for the sstSink to return a memory error
// since the default queue byte size is 16MiB.
sqlDB.ExpectErr(t, "failed to reserve memory for sstSink queue", `BACKUP INTO 'nodelocal://0/foo'`)

// Reduce the queue byte size cluster setting.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '5MiB'`)

// Now the backup should succeed because it is below the `byteLimit`.
sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/bar'`)
}
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
// and not report any progress in the meantime unless it is disabled.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)

// Test servers only have 128MB root memory monitors, reduce the buffer size
// so we don't see memory errors.
srcDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`)

// Take a backup that we'll use to create an OFFLINE descriptor.
srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`)
srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc)
Expand Down
76 changes: 76 additions & 0 deletions pkg/ccl/backupccl/memory_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// memoryAccumulator is a thin wrapper around a BoundAccount that only releases memory
// from the bound account when it is closed, otherwise it accumulates and
// re-uses resources.
// This is useful when resources, once accumulated should not be returned as
// they may be needed later to make progress.
// It is safe for concurrent use.
type memoryAccumulator struct {
syncutil.Mutex
ba mon.BoundAccount
reserved int64
}

// newMemoryAccumulator creates a new accumulator backed by a bound account created
// from the given memory monitor.
func newMemoryAccumulator(mm *mon.BytesMonitor) *memoryAccumulator {
if mm == nil {
return nil
}
return &memoryAccumulator{ba: mm.MakeBoundAccount()}
}

// request checks that the given number of bytes is available, requesting some
// from the backing monitor if necessary.
func (acc *memoryAccumulator) request(ctx context.Context, requested int64) error {
if acc == nil {
return nil
}

acc.Lock()
defer acc.Unlock()

if acc.reserved >= requested {
acc.reserved -= requested
return nil
}

return acc.ba.Grow(ctx, requested)
}

// release releases a number of bytes back into the internal reserved pool.
func (acc *memoryAccumulator) release(released int64) {
if acc == nil {
return
}

acc.Lock()
defer acc.Unlock()

acc.reserved += released
}

// close returns all accumulated memory to the backing monitor.
func (acc *memoryAccumulator) close(ctx context.Context) {
if acc == nil {
return
}

acc.ba.Close(ctx)
}
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/column-families
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ALTER TABLE cfs SPLIT AT SELECT a FROM cfs;
-- Split the output files very small to catch output SSTs mid-row.
SET CLUSTER SETTING bulkio.backup.file_size = '1';
SET CLUSTER SETTING kv.bulk_sst.target_size = '1';
SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB';
BACKUP cfs TO 'nodelocal://1/foo';
CREATE DATABASE r1;
RESTORE cfs FROM 'nodelocal://1/foo' WITH into_db='r1';
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{})

backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon")
backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon")

serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit(
"server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor,
Expand Down Expand Up @@ -536,6 +537,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
VecFDSemaphore: semaphore.New(envutil.EnvOrDefaultInt("COCKROACH_VEC_MAX_OPEN_FDS", colexec.VecMaxOpenFDsLimit)),
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
BackupMonitor: backupMemoryMonitor,

ParentMemoryMonitor: rootSQLMemoryMonitor,
BulkAdder: func(
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,11 @@ type BackupRestoreTestingKnobs struct {
// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
// span has been exported.
RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse)

// BackupMonitor is used to overwrite the monitor used by backup during
// testing. This is typically the bulk mem monitor if not
// specified here.
BackupMemMonitor *mon.BytesMonitor
}

var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ type ServerConfig struct {
// used by the column and index backfillers.
BackfillerMonitor *mon.BytesMonitor

// Child monitor of the bulk monitor which will be used to monitor the memory
// used during backup.
BackupMonitor *mon.BytesMonitor

// ParentDiskMonitor is normally the root disk monitor. It should only be used
// when setting up a server, a child monitor (usually belonging to a sql
// execution flow), or in tests. It is used to monitor temporary storage disk
Expand Down

0 comments on commit 65c5ab6

Please sign in to comment.