Skip to content

Commit

Permalink
backupccl: replace memory accumulator with bound account
Browse files Browse the repository at this point in the history
This change replaces the memory accumulator with a raw bound
account. The memory accumulator provides a threadsafe wrapper
around the bound account, and some redundant resource pooling
semantics. Both of these are currently not needed by the sstSink
that is being memory monitored. This change deletes the memory
accumulator.

Release note: None
  • Loading branch information
adityamaru committed Jan 6, 2022
1 parent 00cb5d1 commit 21118a0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 87 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"create_scheduled_backup.go",
"key_rewriter.go",
"manifest_handling.go",
"memory_utils.go",
"restoration_data.go",
"restore_data_processor.go",
"restore_job.go",
Expand Down
44 changes: 26 additions & 18 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -123,8 +124,8 @@ type backupDataProcessor struct {
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
backupErr error

// Memory accumulator that reserves the memory usage of the backup processor.
backupMem *memoryAccumulator
// BoundAccount that reserves the memory usage of the backup processor.
memAcc *mon.BoundAccount
}

var _ execinfra.Processor = &backupDataProcessor{}
Expand All @@ -143,12 +144,13 @@ func newBackupDataProcessor(
memMonitor = knobs.BackupMemMonitor
}
}
ba := memMonitor.MakeBoundAccount()
bp := &backupDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
backupMem: newMemoryAccumulator(memMonitor),
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
memAcc: &ba,
}
if err := bp.Init(bp, post, backupOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -177,7 +179,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
TaskName: "backup-worker",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.backupMem)
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc)
cancel()
close(bp.progCh)
}); err != nil {
Expand Down Expand Up @@ -213,7 +215,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.ProcessorBase.InternalClose()
bp.backupMem.close(bp.Ctx)
bp.memAcc.Close(bp.Ctx)
}

// ConsumerClosed is part of the RowSource interface. We have to override the
Expand Down Expand Up @@ -245,7 +247,7 @@ func runBackupProcessor(
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.BackupDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupMem *memoryAccumulator,
memAcc *mon.BoundAccount,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings
Expand Down Expand Up @@ -544,7 +546,7 @@ func runBackupProcessor(
return err
}

sink, err := makeSSTSink(ctx, sinkConf, storage, backupMem)
sink, err := makeSSTSink(ctx, sinkConf, storage, memAcc)
if err != nil {
return err
}
Expand Down Expand Up @@ -602,18 +604,24 @@ type sstSink struct {
spanGrows int
}

backupMem *memoryAccumulator
memAcc struct {
ba *mon.BoundAccount
reservedBytes int64
}
}

func makeSSTSink(
ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *memoryAccumulator,
ctx context.Context, conf sstSinkConf, dest cloud.ExternalStorage, backupMem *mon.BoundAccount,
) (*sstSink, error) {
s := &sstSink{conf: conf, dest: dest, backupMem: backupMem}
s := &sstSink{conf: conf, dest: dest}
s.memAcc.ba = backupMem

// Reserve memory for the file buffer.
if err := s.backupMem.request(ctx, smallFileBuffer.Get(s.conf.settings)); err != nil {
bufSize := smallFileBuffer.Get(s.conf.settings)
if err := s.memAcc.ba.Grow(ctx, bufSize); err != nil {
return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue")
}
s.memAcc.reservedBytes += bufSize
return s, nil
}

Expand All @@ -626,9 +634,9 @@ func (s *sstSink) Close() error {
s.cancel()
}

// Release the memory reserved for the file buffer back to the memory
// accumulator.
s.backupMem.release(smallFileBuffer.Get(s.conf.settings))
// Release the memory reserved for the file buffer.
s.memAcc.ba.Shrink(s.ctx, s.memAcc.reservedBytes)
s.memAcc.reservedBytes = 0
if s.out != nil {
return s.out.Close()
}
Expand Down
68 changes: 0 additions & 68 deletions pkg/ccl/backupccl/memory_utils.go

This file was deleted.

0 comments on commit 21118a0

Please sign in to comment.